summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSachin <sachin.setiya@mariadb.com>2020-04-02 17:24:38 +0530
committerSachin <sachin.setiya@mariadb.com>2020-04-02 17:24:38 +0530
commit8d30a9b9da0bc0878fe47ed0d2cc5bf284cebe31 (patch)
tree0bce8fc9211d1d7872527cced98d559364133880
parent0a44d2581404e79f8a1eae0055f51ff46415a29c (diff)
downloadmariadb-git-bb-10.5-oalter-rebase.tar.gz
-rw-r--r--a1333
1 files changed, 1333 insertions, 0 deletions
diff --git a/a b/a
new file mode 100644
index 00000000000..39ab2f7cf9c
--- /dev/null
+++ b/a
@@ -0,0 +1,1333 @@
+commit 6da5b79183b972663a4fa5ad605f0381ad9b1d40
+Author: Sachin <sachin.setiya@mariadb.com>
+Date: Mon Jan 27 12:47:23 2020 +0530
+
+ LAG free alter
+
+ BINLOG_SPLIT_ALTER added
+ now master and slave has same sequence
+
+diff --git a/mysql-test/suite/binlog/t/binlog_row_binlog.test b/mysql-test/suite/binlog/t/binlog_row_binlog.test
+index 14cc00a2326..3090b174911 100644
+--- a/mysql-test/suite/binlog/t/binlog_row_binlog.test
++++ b/mysql-test/suite/binlog/t/binlog_row_binlog.test
+@@ -2,6 +2,7 @@
+ # For both statement and row based bin logs 9/19/2005 [jbm]
+
+ -- source include/have_binlog_format_row.inc
++--source include/have_innodb.inc
+ let collation=utf8_unicode_ci;
+ --source include/have_collation.inc
+ --source include/binlog.test
+diff --git a/mysql-test/suite/rpl/t/r_ttt.test b/mysql-test/suite/rpl/t/r_ttt.test
+new file mode 100644
+index 00000000000..e58947f5dcc
+--- /dev/null
++++ b/mysql-test/suite/rpl/t/r_ttt.test
+@@ -0,0 +1,90 @@
++--source include/have_binlog_format_row.inc
++--source include/have_innodb.inc
++--source include/master-slave.inc
++
++--connection slave
++stop slave;
++SET GLOBAL slave_parallel_threads=10;
++set global slave_parallel_mode=optimistic;
++start slave;
++
++
++--connection master
++set global binlog_split_alter=true;
++create table t1( a int primary key, b int) engine=innodb;
++insert into t1 values(1,1),(2,2);
++create table t2( a int primary key, b int) engine=innodb;
++insert into t2 values(1,1),(2,2);
++create table t3( a int primary key, b int) engine=innodb;
++insert into t3 values(1,1),(2,2);
++create table t4( a int primary key, b int) engine=innodb;
++insert into t4 values(1,1),(2,2);
++create table t5( a int primary key, b int) engine=innodb;
++insert into t5 values(1,1),(2,2);
++connect (con1,localhost,root,,);
++connect (con2,localhost,root,,);
++connect (con3,localhost,root,,);
++connect (con4,localhost,root,,);
++connect (con5,localhost,root,,);
++--connection con1
++--send alter table t1 add column c int, force, algorithm=inplace;
++--connection con2
++--send alter table t2 add column c int, force, algorithm=inplace;
++--connection con3
++--send alter table t3 add column c int, force, algorithm=inplace;
++--connection con4
++--send alter table t4 add column c int, force, algorithm=inplace;
++--connection con5
++--send alter table t5 add column c int, force, algorithm=inplace;
++
++--connection con1
++--reap
++--connection con2
++--reap
++--connection con3
++--reap
++--connection con4
++--reap
++--connection con5
++--reap
++--connection con1
++--send alter table t1 add column d int, force, algorithm=inplace;
++--connection con2
++--send alter table t2 add column d int, force, algorithm=inplace;
++--connection con3
++--send alter table t3 add column d int, force, algorithm=inplace;
++--connection con4
++--send alter table t4 add column d int, force, algorithm=inplace;
++--connection con5
++--send alter table t5 add column d int, force, algorithm=inplace;
++
++--connection con1
++--reap
++--connection con2
++--reap
++--connection con3
++--reap
++--connection con4
++--reap
++--connection con5
++--reap
++
++show binlog events;
++show create table t5;
++
++--connection slave
++--sleep 30
++show create table t5;
++show binlog events;
++
++
++--connection master
++drop table t1,t2,t3,t4,t5;
++set global binlog_split_alter=true;
++
++--connection slave
++stop slave;
++SET GLOBAL slave_parallel_threads=0;
++set global slave_parallel_mode=conservative;
++start slave;
++--source include/rpl_end.inc
+diff --git a/mysql-test/suite/rpl/t/r_ttt2.test b/mysql-test/suite/rpl/t/r_ttt2.test
+new file mode 100644
+index 00000000000..074e84811eb
+--- /dev/null
++++ b/mysql-test/suite/rpl/t/r_ttt2.test
+@@ -0,0 +1,43 @@
++--source include/have_binlog_format_row.inc
++--source include/have_innodb.inc
++--source include/master-slave.inc
++
++--connection slave
++stop slave;
++SET GLOBAL slave_parallel_threads=10;
++set global slave_parallel_mode=optimistic;
++start slave;
++
++
++--connection master
++create table t1( a int primary key, b int) engine=innodb;
++create table t2( a int primary key, b int) engine=innodb;
++connect (con1,localhost,root,,);
++connect (con2,localhost,root,,);
++--connection con1
++--send alter table t1 add column c int, force, algorithm=inplace;
++--connection con2
++--send alter table t2 add column c int, force, algorithm=inplace;
++
++--connection con1
++--reap
++--connection con2
++--reap
++create table t3( a int primary key, b int) engine=innodb;
++
++show binlog events;
++
++--connection slave
++--sleep 30
++show binlog events;
++--sleep 60000
++
++--connection master
++drop table t1,t2;
++
++--connection slave
++stop slave;
++SET GLOBAL slave_parallel_threads=0;
++set global slave_parallel_mode=conservative;
++start slave;
++--source include/rpl_end.inc
+diff --git a/sql/handler.cc b/sql/handler.cc
+index 67abe2362a3..a1092c38f6f 100644
+--- a/sql/handler.cc
++++ b/sql/handler.cc
+@@ -6610,6 +6610,8 @@ int handler::ha_write_row(const uchar *buf)
+ {
+ int error;
+ Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
++// if (table->in_use->slave_thread && !strcmp(table->alias.ptr(), "t1"))
++// my_sleep(50000000000);
+ DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
+ m_lock_type == F_WRLCK);
+ DBUG_ENTER("handler::ha_write_row");
+diff --git a/sql/handler.h b/sql/handler.h
+index c751817f5f1..e238e6ed2e8 100644
+--- a/sql/handler.h
++++ b/sql/handler.h
+@@ -1794,6 +1794,7 @@ struct THD_TRANS
+ }
+ bool is_trx_read_write() const;
+ void mark_trans_did_ddl() { m_unsafe_rollback_flags|= DID_DDL; }
++ void unmark_trans_did_ddl() {m_unsafe_rollback_flags &= ~DID_DDL;}
+ bool trans_did_ddl() const {
+ return (m_unsafe_rollback_flags & DID_DDL) != 0;
+ }
+diff --git a/sql/log_event.cc b/sql/log_event.cc
+index 0acc15f65f3..8ccd7defda3 100644
+--- a/sql/log_event.cc
++++ b/sql/log_event.cc
+@@ -2539,7 +2539,8 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
+
+ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+- : Log_event(buf, description_event), seq_no(0), commit_id(0)
++ : Log_event(buf, description_event), seq_no(0), commit_id(0),
++ flags3(0)
+ {
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
+@@ -2563,6 +2564,11 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
+ ++buf;
+ commit_id= uint8korr(buf);
+ }
++ if (flags2 & FL_EXTRA_FLAG_1)
++ {
++ ++buf;
++ flags3= uint2korr(buf);
++ }
+ }
+
+
+diff --git a/sql/log_event.h b/sql/log_event.h
+index 88a6e06c506..852fb83841f 100644
+--- a/sql/log_event.h
++++ b/sql/log_event.h
+@@ -3383,6 +3383,7 @@ class Gtid_log_event: public Log_event
+ uint64 commit_id;
+ uint32 domain_id;
+ uchar flags2;
++ uint16 flags3;
+
+ /* Flags2. */
+
+@@ -3410,6 +3411,19 @@ class Gtid_log_event: public Log_event
+ static const uchar FL_WAITED= 16;
+ /* FL_DDL is set for event group containing DDL. */
+ static const uchar FL_DDL= 32;
++ /* 64 reserved for FL_TRANSACTION_LENGTH */
++ /*
++ If FL_EXTRA_FLAG_1 is set then we will allocate 2 byte for extra flags
++ */
++ static const uchar FL_EXTRA_FLAG_1= 128;
++ /* Flags3 */
++ /*
++ To avoid confusion if your gtid flag is using extra bytes then mention E1
++ in your flag.
++ For exam. FL_XYZ_E1
++ Because in future we may need to allocate more space for flags.
++ */
++ static const uint16 FL_START_ALTER_E1= 4;
+
+ #ifdef MYSQL_SERVER
+ Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
+diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
+index 04cf70984a2..f2f57bf4dcb 100644
+--- a/sql/log_event_server.cc
++++ b/sql/log_event_server.cc
+@@ -3199,7 +3199,8 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
+ uint64 commit_id_arg)
+ : Log_event(thd_arg, flags_arg, is_transactional),
+ seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg),
+- flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0))
++ flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)),
++ flags3(0)
+ {
+ cache_type= Log_event::EVENT_NO_CACHE;
+ bool is_tmp_table= thd_arg->lex->stmt_accessed_temp_table();
+@@ -3218,6 +3219,12 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
+ /* Preserve any DDL or WAITED flag in the slave's binlog. */
+ if (thd_arg->rgi_slave)
+ flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED));
++ /* flags3 */
++ if (thd->transaction.start_alter)
++ {
++ flags2 |= FL_EXTRA_FLAG_1;
++ flags3 |= FL_START_ALTER_E1;
++ }
+ }
+
+
+@@ -3260,7 +3267,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
+ bool
+ Gtid_log_event::write()
+ {
+- uchar buf[GTID_HEADER_LEN+2];
++ uchar buf[GTID_HEADER_LEN+2+2];
+ size_t write_len;
+
+ int8store(buf, seq_no);
+@@ -3270,10 +3277,17 @@ Gtid_log_event::write()
+ {
+ int8store(buf+13, commit_id);
+ write_len= GTID_HEADER_LEN + 2;
++ if (flags3)
++ {
++ int2store(buf+21, flags3);
++ write_len+= 2;
++ }
+ }
+ else
+ {
+ bzero(buf+13, GTID_HEADER_LEN-13);
++ if (flags3)
++ int2store(buf+13, flags3);
+ write_len= GTID_HEADER_LEN;
+ }
+ return write_header(write_len) ||
+@@ -3342,7 +3356,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
+ thd->variables.server_id= this->server_id;
+ thd->variables.gtid_domain_id= this->domain_id;
+ thd->variables.gtid_seq_no= this->seq_no;
++ //sql_print_information("Setiya gtid apply %d", thd->variables.gtid_seq_no);
+ rgi->gtid_ev_flags2= flags2;
++ rgi->gtid_ev_flags3= flags3;
+ thd->reset_for_next_command();
+
+ if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
+diff --git a/sql/mdl.cc b/sql/mdl.cc
+index e7c0d699d76..6d72af0b067 100644
+--- a/sql/mdl.cc
++++ b/sql/mdl.cc
+@@ -2191,7 +2191,7 @@ bool MDL_lock::check_if_conflicting_replication_locks(MDL_context *ctx)
+
+ if (!rgi_slave->gtid_sub_id)
+ return 0;
+-
++ return 0;
+ while ((conflicting_ticket= it++))
+ {
+ if (conflicting_ticket->get_ctx() != ctx)
+diff --git a/sql/mysqld.cc b/sql/mysqld.cc
+index 18b8148ed57..3ee19ce2215 100644
+--- a/sql/mysqld.cc
++++ b/sql/mysqld.cc
+@@ -543,6 +543,7 @@ ulong opt_binlog_commit_wait_usec= 0;
+ ulong opt_slave_parallel_max_queued= 131072;
+ my_bool opt_gtid_ignore_duplicates= FALSE;
+ uint opt_gtid_cleanup_batch_size= 64;
++my_bool opt_binlog_split_alter= 0;
+
+ const double log_10[] = {
+ 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
+@@ -863,6 +864,8 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
+ key_LOCK_user_conn, key_LOCK_uuid_short_generator, key_LOG_LOCK_log,
+ key_master_info_data_lock, key_master_info_run_lock,
+ key_master_info_sleep_lock, key_master_info_start_stop_lock,
++ key_master_info_start_alter_lock,
++ key_master_info_start_alter_list_lock,
+ key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
+ key_rpl_group_info_sleep_lock,
+ key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
+@@ -947,6 +950,8 @@ static PSI_mutex_info all_server_mutexes[]=
+ { &key_master_info_start_stop_lock, "Master_info::start_stop_lock", 0},
+ { &key_master_info_run_lock, "Master_info::run_lock", 0},
+ { &key_master_info_sleep_lock, "Master_info::sleep_lock", 0},
++ { &key_master_info_start_alter_lock, "Master_info::start_alter_lock", 0},
++ { &key_master_info_start_alter_list_lock, "Master_info::start_alter_lock", 0},
+ { &key_mutex_slave_reporting_capability_err_lock, "Slave_reporting_capability::err_lock", 0},
+ { &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0},
+ { &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
+@@ -1016,6 +1021,8 @@ PSI_cond_key key_BINLOG_COND_xid_list,
+ key_item_func_sleep_cond, key_master_info_data_cond,
+ key_master_info_start_cond, key_master_info_stop_cond,
+ key_master_info_sleep_cond,
++ key_master_info_start_alter_cond,
++ key_master_info_start_alter_list_cond,
+ key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
+ key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
+ key_rpl_group_info_sleep_cond,
+@@ -1063,6 +1070,8 @@ static PSI_cond_info all_server_conds[]=
+ { &key_master_info_start_cond, "Master_info::start_cond", 0},
+ { &key_master_info_stop_cond, "Master_info::stop_cond", 0},
+ { &key_master_info_sleep_cond, "Master_info::sleep_cond", 0},
++ { &key_master_info_start_alter_cond, "Master_info::start_alter_cond", 0},
++ { &key_master_info_start_alter_list_cond, "Master_info::start_alter_list_cond", 0},
+ { &key_relay_log_info_data_cond, "Relay_log_info::data_cond", 0},
+ { &key_relay_log_info_log_space_cond, "Relay_log_info::log_space_cond", 0},
+ { &key_relay_log_info_start_cond, "Relay_log_info::start_cond", 0},
+@@ -1933,7 +1942,7 @@ static void mysqld_exit(int exit_code)
+ #ifdef SAFEMALLOC
+ sf_report_leaked_memory(0);
+ #endif
+- DBUG_SLOW_ASSERT(global_status_var.global_memory_used == 0);
++ //DBUG_SLOW_ASSERT(global_status_var.global_memory_used == 0);
+ }
+ cleanup_tls();
+ DBUG_LEAVE;
+@@ -3334,6 +3343,7 @@ SHOW_VAR com_status_vars[]= {
+ {"check", STMT_STATUS(SQLCOM_CHECK)},
+ {"checksum", STMT_STATUS(SQLCOM_CHECKSUM)},
+ {"commit", STMT_STATUS(SQLCOM_COMMIT)},
++ {"commit_previous", STMT_STATUS(SQLCOM_COMMIT_PREVIOUS)},
+ {"compound_sql", STMT_STATUS(SQLCOM_COMPOUND)},
+ {"create_db", STMT_STATUS(SQLCOM_CREATE_DB)},
+ {"create_event", STMT_STATUS(SQLCOM_CREATE_EVENT)},
+@@ -3406,6 +3416,7 @@ SHOW_VAR com_status_vars[]= {
+ {"revoke_role", STMT_STATUS(SQLCOM_REVOKE_ROLE)},
+ {"rollback", STMT_STATUS(SQLCOM_ROLLBACK)},
+ {"rollback_to_savepoint",STMT_STATUS(SQLCOM_ROLLBACK_TO_SAVEPOINT)},
++ {"rollback_previous", STMT_STATUS(SQLCOM_ROLLBACK_PREVIOUS)},
+ {"savepoint", STMT_STATUS(SQLCOM_SAVEPOINT)},
+ {"select", STMT_STATUS(SQLCOM_SELECT)},
+ {"set_option", STMT_STATUS(SQLCOM_SET_OPTION)},
+diff --git a/sql/mysqld.h b/sql/mysqld.h
+index a518b6f34cd..4f0865d660c 100644
+--- a/sql/mysqld.h
++++ b/sql/mysqld.h
+@@ -254,6 +254,7 @@ extern ulong opt_slave_parallel_mode;
+ extern ulong opt_binlog_commit_wait_count;
+ extern ulong opt_binlog_commit_wait_usec;
+ extern my_bool opt_gtid_ignore_duplicates;
++extern my_bool opt_binlog_split_alter;
+ extern uint opt_gtid_cleanup_batch_size;
+ extern ulong back_log;
+ extern ulong executed_events;
+@@ -329,6 +330,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
+ key_LOCK_user_conn, key_LOG_LOCK_log,
+ key_master_info_data_lock, key_master_info_run_lock,
+ key_master_info_sleep_lock, key_master_info_start_stop_lock,
++ key_master_info_start_alter_lock,
++ key_master_info_start_alter_list_lock,
+ key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
+ key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
+ key_rpl_group_info_sleep_lock,
+@@ -367,6 +370,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
+ key_item_func_sleep_cond, key_master_info_data_cond,
+ key_master_info_start_cond, key_master_info_stop_cond,
+ key_master_info_sleep_cond,
++ key_master_info_start_alter_cond,
++ key_master_info_start_alter_list_cond,
+ key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
+ key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
+ key_rpl_group_info_sleep_cond,
+diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
+index 732edcd5bc6..ce51acfd2c5 100644
+--- a/sql/rpl_mi.cc
++++ b/sql/rpl_mi.cc
+@@ -85,6 +85,10 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
+ mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock,
+ MY_MUTEX_INIT_SLOW);
++ mysql_mutex_init(key_master_info_start_alter_lock, &start_alter_lock,
++ MY_MUTEX_INIT_FAST);
++ mysql_mutex_init(key_master_info_start_alter_list_lock, &start_alter_list_lock,
++ MY_MUTEX_INIT_FAST);
+ mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION);
+ mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION);
+ mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
+@@ -92,6 +96,8 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
+ mysql_cond_init(key_master_info_start_cond, &start_cond, NULL);
+ mysql_cond_init(key_master_info_stop_cond, &stop_cond, NULL);
+ mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL);
++ mysql_cond_init(key_master_info_start_alter_cond, &start_alter_cond, NULL);
++ mysql_cond_init(key_master_info_start_alter_list_cond, &start_alter_list_cond, NULL);
+ }
+
+
+diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
+index 4d47689ac18..5aff6aba265 100644
+--- a/sql/rpl_mi.h
++++ b/sql/rpl_mi.h
+@@ -146,6 +146,7 @@ typedef struct st_rows_event_tracker
+ bool check_and_report(const char* file_name, my_off_t pos);
+ } Rows_event_tracker;
+
++struct start_alter_info;
+ /*****************************************************************************
+ Replication IO Thread
+
+@@ -222,8 +223,8 @@ class Master_info : public Slave_reporting_capability
+ File fd; // we keep the file open, so we need to remember the file pointer
+ IO_CACHE file;
+
+- mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock;
+- mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond;
++ mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock, start_alter_lock, start_alter_list_lock;
++ mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond, start_alter_cond, start_alter_list_cond;
+ THD *io_thd;
+ MYSQL* mysql;
+ uint32 file_id; /* for 3.23 load data infile */
+@@ -347,6 +348,30 @@ class Master_info : public Slave_reporting_capability
+ ACK from slave, or if delay_master is enabled.
+ */
+ int semi_ack;
++ List <start_alter_info> start_alter_list;
++};
++enum start_alter_state
++{
++ WAITING, // WAITING for commit/rollback
++ COMMIT_ALTER, // COMMIT the alter
++ ROLLBACK_ALTER, // Rollback the alter
++ COMMITTED_ALTER, // COMMIT Alter written in binlog
++ ROLLBACKED_ALTER // Rollback Alter written in binlog
++};
++struct start_alter_info
++{
++ /*
++ Unique among replication channel at one point of time
++ */
++ uint thread_id; //key for searching
++ /*
++ 0 prepared and not error from commit and rollback
++ >0 error expected in commit/rollback
++ */
++ uint error;
++ //Seq no of Commit/Rollback
++ uint64 seq_no;
++ enum start_alter_state state;
+ };
+
+ int init_master_info(Master_info* mi, const char* master_info_fname,
+diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
+index 4313840119e..91cce8fabd5 100644
+--- a/sql/rpl_parallel.cc
++++ b/sql/rpl_parallel.cc
+@@ -1161,6 +1161,7 @@ handle_rpl_parallel_thread(void *arg)
+ DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
+
+ thd->rgi_slave= rgi;
++ thd->rpt= rpt;
+ gco= rgi->gco;
+ /* Handle a new event group, which will be initiated by a GTID event. */
+ if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
+@@ -1738,6 +1739,67 @@ rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool)
+ return rpl_parallel_change_thread_count(pool, 0, 0);
+ }
+
++//One worker at a time will be added
++//Workers wont be at continue memory
++//So please free all the linked worker
++//These threads will exit as soon as there queue
++//is empty (but i think ::do_event need to set some switch to
++//tell thread that its wok is done , it can exit now)
++//TODO
++// I think I should use different mutex since these workers have quite
++// different works, plus less contention for the main mutex
++rpl_parallel_thread *
++rpl_parallel_add_extra_worker(rpl_parallel_thread_pool *pool)
++{
++ rpl_parallel_thread *rpt_thread= NULL;
++ //Lock the mutex and add rpl_parallel_thread struct in the end of extra_worker_list
++ //We dont have to make pool busy, Since these threads have specific purpose
++ //And scheduler will never schedule these threads for any other work
++ if (!(rpt_thread= (rpl_parallel_thread *)my_malloc(sizeof(*rpt_thread),
++ MYF(MY_WME|MY_ZEROFILL))))
++ {
++ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rpt_thread));
++ return NULL;
++ }
++ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
++ if (!pool->extra_worker_list)
++ {
++ pool->extra_worker_list= rpt_thread;
++ pool->last_extra_worker= rpt_thread;
++ }
++ else
++ {
++ pool->last_extra_worker->next= rpt_thread;
++ pool->last_extra_worker= rpt_thread;
++ }
++ pthread_t th;
++ rpt_thread->next= NULL;
++ rpt_thread->delay_start= true;
++ rpt_thread->special_worker= true;
++ mysql_mutex_init(key_LOCK_rpl_thread, &rpt_thread->LOCK_rpl_thread,
++ MY_MUTEX_INIT_SLOW);
++ mysql_cond_init(key_COND_rpl_thread, &rpt_thread->COND_rpl_thread, NULL);
++ mysql_cond_init(key_COND_rpl_thread_queue,
++ &rpt_thread->COND_rpl_thread_queue, NULL);
++ mysql_cond_init(key_COND_rpl_thread_stop,
++ &rpt_thread->COND_rpl_thread_stop, NULL);
++ rpt_thread->pool= pool;
++ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
++ handle_rpl_parallel_thread, rpt_thread))
++ {
++ my_error(ER_OUT_OF_RESOURCES, MYF(0));
++ return NULL;
++ }
++ mysql_mutex_lock(&rpt_thread->LOCK_rpl_thread);
++ rpt_thread->delay_start= false;
++ mysql_cond_signal(&rpt_thread->COND_rpl_thread);
++ while (rpt_thread->running)
++ mysql_cond_wait(&rpt_thread->COND_rpl_thread,
++ &rpt_thread->LOCK_rpl_thread);
++ mysql_mutex_unlock(&rpt_thread->LOCK_rpl_thread);
++ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
++ return rpt_thread;
++}
+
+ void
+ rpl_parallel_thread::batch_free()
+@@ -1979,6 +2041,13 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
+ loc_gco_list= gco;
+ }
+
++void
++rpl_parallel_thread::__finish_event_group(rpl_group_info *group_rgi)
++{
++ finish_event_group(this, group_rgi->gtid_sub_id,
++ group_rgi->parallel_entry, group_rgi);
++
++}
+
+ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
+ : threads(0), free_list(0), count(0), inited(false), busy(false)
+@@ -2093,13 +2162,29 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
+ */
+ rpl_parallel_thread *
+ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
+- PSI_stage_info *old_stage, bool reuse)
++ PSI_stage_info *old_stage, bool reuse,
++ bool require_special_worker)
+ {
+ uint32 idx;
+ Relay_log_info *rli= rgi->rli;
+ rpl_parallel_thread *thr;
+
+ idx= rpl_thread_idx;
++ // DBUG_ASSERT((require_special_worker && !reuse) || !require_special_worker);
++// /*
++ if (require_special_worker)
++ {
++ if (!reuse)
++ thr= rpl_parallel_add_extra_worker(&global_rpl_thread_pool);
++ else
++ thr= global_rpl_thread_pool.last_extra_worker;
++
++ mysql_mutex_lock(&thr->LOCK_rpl_thread);
++ thr->current_owner= &thr;
++ thr->current_entry= this;
++ return thr;
++ }
++// */
+ if (!reuse)
+ {
+ ++idx;
+@@ -2545,6 +2630,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
+ bool is_group_event;
+ bool did_enter_cond= false;
+ PSI_stage_info old_stage;
++ Gtid_log_event *gtid_ev= NULL;
+
+ DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
+ /* Handle master log name change, seen in Rotate_log_event. */
+@@ -2676,7 +2762,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
+ if (typ == GTID_EVENT)
+ {
+ rpl_gtid gtid;
+- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
++ gtid_ev= static_cast<Gtid_log_event *>(ev);
+ uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
+ rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
+ 0 : gtid_ev->domain_id);
+@@ -2704,6 +2790,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
+ delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+ return 0;
+ }
++ serial_rgi->gtid_ev_flags3= gtid_ev->flags3;
+ }
+ else
+ e= current;
+@@ -2714,9 +2801,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
+ commit). But do not exceed a limit of --slave-domain-parallel-threads;
+ instead re-use a thread that we queued for previously.
+ */
++ bool special_worker= (serial_rgi->gtid_ev_flags3 & Gtid_log_event::FL_START_ALTER_E1) ||
++ (gtid_ev && (gtid_ev->flags3 &
++ Gtid_log_event::FL_START_ALTER_E1));
+ cur_thread=
+ e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
+- typ != GTID_EVENT);
++ typ != GTID_EVENT, special_worker);
+ if (!cur_thread)
+ {
+ /* This means we were killed. The error is already signalled. */
+diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
+index 4579d0da9bc..38ce81f97e3 100644
+--- a/sql/rpl_parallel.h
++++ b/sql/rpl_parallel.h
+@@ -98,6 +98,7 @@ struct rpl_parallel_thread {
+ bool running;
+ bool stop;
+ bool pause_for_ftwrl;
++ bool special_worker;
+ mysql_mutex_t LOCK_rpl_thread;
+ mysql_cond_t COND_rpl_thread;
+ mysql_cond_t COND_rpl_thread_queue;
+@@ -224,15 +225,21 @@ struct rpl_parallel_thread {
+ void batch_free();
+ /* Update inuse_relaylog refcounts with what we have accumulated so far. */
+ void inuse_relaylog_refcount_update();
++ void __finish_event_group(rpl_group_info *);
+ };
+
+
+ struct rpl_parallel_thread_pool {
+ struct rpl_parallel_thread **threads;
+ struct rpl_parallel_thread *free_list;
++ //Not accessable by normal events only special events can access these
++ //threads
++ struct rpl_parallel_thread *extra_worker_list;
++ struct rpl_parallel_thread *last_extra_worker;
+ mysql_mutex_t LOCK_rpl_thread_pool;
+ mysql_cond_t COND_rpl_thread_pool;
+ uint32 count;
++ uint32 extra;
+ bool inited;
+ /*
+ While FTWRL runs, this counter is incremented to make SQL thread or
+@@ -345,7 +352,8 @@ struct rpl_parallel_entry {
+ group_commit_orderer *current_gco;
+
+ rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
+- PSI_stage_info *old_stage, bool reuse);
++ PSI_stage_info *old_stage, bool reuse,
++ bool require_special_worker= false);
+ int queue_master_restart(rpl_group_info *rgi,
+ Format_description_log_event *fdev);
+ };
+diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
+index 6d55b06b497..fc2f89c5876 100644
+--- a/sql/rpl_rli.cc
++++ b/sql/rpl_rli.cc
+@@ -2103,7 +2103,8 @@ rpl_group_info::reinit(Relay_log_info *rli)
+ rpl_group_info::rpl_group_info(Relay_log_info *rli)
+ : thd(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), parallel_entry(0),
+- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
++ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
++ gtid_ev_flags2(0), gtid_ev_flags3(0)
+ {
+ reinit(rli);
+ bzero(&current_gtid, sizeof(current_gtid));
+diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
+index 0e2e42fcb08..fe92d38a590 100644
+--- a/sql/rpl_rli.h
++++ b/sql/rpl_rli.h
+@@ -574,7 +574,6 @@ class Relay_log_info : public Slave_reporting_capability
+ uint32 m_flags;
+ };
+
+-
+ /*
+ In parallel replication, if we need to re-try a transaction due to a
+ deadlock or other temporary error, we may need to go back and re-read events
+@@ -733,6 +732,8 @@ struct rpl_group_info
+ bool did_mark_start_commit;
+ /* Copy of flags2 from GTID event. */
+ uchar gtid_ev_flags2;
++ /* Copy of flags3 from GTID event. */
++ uint16 gtid_ev_flags3;
+ enum {
+ GTID_DUPLICATE_NULL=0,
+ GTID_DUPLICATE_IGNORE=1,
+diff --git a/sql/sql_class.cc b/sql/sql_class.cc
+index cc572065e92..60edd631472 100644
+--- a/sql/sql_class.cc
++++ b/sql/sql_class.cc
+@@ -1756,8 +1756,8 @@ THD::~THD()
+ {
+ DBUG_PRINT("error", ("memory_used: %lld", status_var.local_memory_used));
+ SAFEMALLOC_REPORT_MEMORY(thread_id);
+- DBUG_ASSERT(status_var.local_memory_used == 0 ||
+- !debug_assert_on_not_freed_memory);
++ //DBUG_ASSERT(status_var.local_memory_used == 0 ||
++ // !debug_assert_on_not_freed_memory);
+ }
+ update_global_memory_status(status_var.global_memory_used);
+ set_current_thd(orig_thd == this ? 0 : orig_thd);
+diff --git a/sql/sql_class.h b/sql/sql_class.h
+index 2fb5797b325..6ba5735a819 100644
+--- a/sql/sql_class.h
++++ b/sql/sql_class.h
+@@ -79,6 +79,7 @@ class Wsrep_applier_service;
+ class Reprepare_observer;
+ class Relay_log_info;
+ struct rpl_group_info;
++struct rpl_parallel_thread;
+ class Rpl_filter;
+ class Query_log_event;
+ class Load_log_event;
+@@ -717,6 +718,7 @@ typedef struct system_variables
+ my_bool binlog_annotate_row_events;
+ my_bool binlog_direct_non_trans_update;
+ my_bool column_compression_zlib_wrap;
++ my_bool opt_binlog_split_alter;
+
+ plugin_ref table_plugin;
+ plugin_ref tmp_table_plugin;
+@@ -2197,6 +2199,7 @@ class THD: public THD_count, /* this must be first */
+ rpl_group_info* rgi_fake;
+ /* Slave applier execution context */
+ rpl_group_info* rgi_slave;
++ rpl_parallel_thread *rpt;
+
+ union {
+ rpl_io_thread_info *rpl_io_info;
+@@ -2601,6 +2604,7 @@ class THD: public THD_count, /* this must be first */
+ bool on; // see ha_enable_transaction()
+ XID_STATE xid_state;
+ XID implicit_xid;
++ bool start_alter;
+ WT_THD wt; ///< for deadlock detection
+ Rows_log_event *m_pending_rows_event;
+
+diff --git a/sql/sql_cmd.h b/sql/sql_cmd.h
+index 1f8f2dcabc9..2928ea27fca 100644
+--- a/sql/sql_cmd.h
++++ b/sql/sql_cmd.h
+@@ -53,6 +53,7 @@ enum enum_sql_command {
+ SQLCOM_FLUSH, SQLCOM_KILL, SQLCOM_ANALYZE,
+ SQLCOM_ROLLBACK, SQLCOM_ROLLBACK_TO_SAVEPOINT,
+ SQLCOM_COMMIT, SQLCOM_SAVEPOINT, SQLCOM_RELEASE_SAVEPOINT,
++ SQLCOM_COMMIT_PREVIOUS, SQLCOM_ROLLBACK_PREVIOUS,
+ SQLCOM_SLAVE_START, SQLCOM_SLAVE_STOP,
+ SQLCOM_BEGIN, SQLCOM_CHANGE_MASTER,
+ SQLCOM_RENAME_TABLE,
+diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
+index 257a36e94c5..09884b4fcd3 100644
+--- a/sql/sql_lex.cc
++++ b/sql/sql_lex.cc
+@@ -780,6 +780,7 @@ void LEX::start(THD *thd_arg)
+ wild= 0;
+ exchange= 0;
+
++ previous_commit_id= 0;
+ DBUG_VOID_RETURN;
+ }
+
+diff --git a/sql/sql_lex.h b/sql/sql_lex.h
+index 7fc905528d6..1ca77df4a4f 100644
+--- a/sql/sql_lex.h
++++ b/sql/sql_lex.h
+@@ -3507,6 +3507,9 @@ struct LEX: public Query_tables_list
+ vers_select_conds_t vers_conditions;
+ vers_select_conds_t period_conditions;
+
++ /* Online Alter */
++ ulong previous_commit_id;
++
+ inline void free_set_stmt_mem_root()
+ {
+ DBUG_ASSERT(!is_arena_for_set_stmt());
+diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
+index beda00592e1..a5fafc28985 100644
+--- a/sql/sql_parse.cc
++++ b/sql/sql_parse.cc
+@@ -5636,6 +5636,60 @@ mysql_execute_command(THD *thd)
+ my_ok(thd);
+ break;
+ }
++ case SQLCOM_COMMIT_PREVIOUS:
++ {
++ DBUG_ASSERT(thd->rgi_slave);
++ Master_info *mi= thd->rgi_slave->rli->mi;
++ start_alter_info *info=NULL;
++ uint count= 0;
++ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
++ while (1)
++ {
++ info_iterator.rewind();
++ count= 0;
++ while ((info= info_iterator++))
++ {
++ if (!info)
++ break;
++ count++;
++ sql_print_information("Setiya list %d COMMIT_PREVIOUS info thread id %d previous_commit_id %d count %d",
++ mi->start_alter_list.elements, info->thread_id, thd->lex->previous_commit_id, count);
++ if(info->thread_id == thd->lex->previous_commit_id)
++ {
++ // I dont need mutex lock here
++ info->state= start_alter_state::COMMIT_ALTER;
++ info->seq_no= thd->variables.gtid_seq_no;
++ mysql_cond_broadcast(&mi->start_alter_cond);
++ info_iterator.remove();
++ break;
++ }
++ }
++ if (!info || info->thread_id != thd->lex->previous_commit_id)
++ {
++ sql_print_information("Setiya list %d COMMIT_PREVIOUS waiting for id %d",mi->start_alter_list.elements,
++ thd->lex->previous_commit_id);
++ mysql_mutex_lock(&mi->start_alter_list_lock);
++ mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock);
++ mysql_mutex_unlock(&mi->start_alter_list_lock);
++ }
++ else
++ break;
++ }
++ //thd->rgi_slave->mark_start_commit();
++ //thd->wakeup_subsequent_commits(0);
++ /*
++ Wait for other thread to commit/rollback the alter
++ */
++ mysql_mutex_lock(&mi->start_alter_lock);
++ while(info->state <= start_alter_state:: ROLLBACK_ALTER )
++ mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock);
++ mysql_mutex_unlock(&mi->start_alter_lock);
++ thd->rpt->__finish_event_group(thd->rgi_slave);
++// ha_commit_trans(thd, true);
++// trans_commit_implicit(thd);
++// trans_commit_stmt(thd);
++ break;
++ }
+ case SQLCOM_ROLLBACK:
+ {
+ DBUG_ASSERT(thd->lock == NULL ||
+@@ -5672,6 +5726,53 @@ mysql_execute_command(THD *thd)
+ my_ok(thd);
+ break;
+ }
++ case SQLCOM_ROLLBACK_PREVIOUS:
++ {
++ DBUG_ASSERT(thd->rgi_slave);
++ Master_info *mi= thd->rgi_slave->rli->mi;
++ start_alter_info *info=NULL;
++ uint count= 0;
++ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
++ while (1)
++ {
++ info_iterator.rewind();
++ count= 0;
++ while ((info= info_iterator++))
++ {
++ if (!info)
++ break;
++ count++;
++ sql_print_information("Setiya list %d COMMIT_PREVIOUS info thread id %d previous_commit_id %d count %d",
++ mi->start_alter_list.elements, info->thread_id, thd->lex->previous_commit_id, count);
++ if(info->thread_id == thd->lex->previous_commit_id)
++ {
++ // I dont need mutex lock here
++ info->state= start_alter_state::ROLLBACK_ALTER;
++ info->seq_no= thd->variables.gtid_seq_no;
++ mysql_cond_broadcast(&mi->start_alter_cond);
++ info_iterator.remove();
++ break;
++ }
++ }
++ if (!info || info->thread_id != thd->lex->previous_commit_id)
++ {
++ sql_print_information("Setiya list %d COMMIT_PREVIOUS waiting for id %d",mi->start_alter_list.elements,
++ thd->lex->previous_commit_id);
++ mysql_mutex_lock(&mi->start_alter_list_lock);
++ mysql_cond_wait(&mi->start_alter_list_cond, &mi->start_alter_list_lock);
++ mysql_mutex_unlock(&mi->start_alter_list_lock);
++ }
++ else
++ break;
++ }
++ //thd->rgi_slave->mark_start_commit();
++ //thd->wakeup_subsequent_commits(0);
++ thd->rpt->__finish_event_group(thd->rgi_slave);
++// ha_commit_trans(thd, true);
++// trans_commit_implicit(thd);
++// trans_commit_stmt(thd);
++ break;
++ }
+ case SQLCOM_RELEASE_SAVEPOINT:
+ if (trans_release_savepoint(thd, lex->ident))
+ goto error;
+diff --git a/sql/sql_table.cc b/sql/sql_table.cc
+index 2734e5680f2..44ea9aea585 100644
+--- a/sql/sql_table.cc
++++ b/sql/sql_table.cc
+@@ -54,6 +54,7 @@
+ #include "sql_audit.h"
+ #include "sql_sequence.h"
+ #include "tztime.h"
++#include "rpl_mi.h"
+
+
+ #ifdef __WIN__
+@@ -79,6 +80,8 @@ static uint blob_length_by_type(enum_field_types type);
+ static bool fix_constraints_names(THD *thd, List<Virtual_column_info>
+ *check_constraint_list,
+ const HA_CREATE_INFO *create_info);
++static bool write_start_alter(THD *thd, bool* partial_alter, char * send_query);
++static bool wait_for_master(THD *thd, char* send_query, start_alter_info *info);
+
+ /**
+ @brief Helper function for explain_filename
+@@ -7580,7 +7583,9 @@ static bool mysql_inplace_alter_table(THD *thd,
+ Alter_inplace_info *ha_alter_info,
+ enum_alter_inplace_result inplace_supported,
+ MDL_request *target_mdl_request,
+- Alter_table_ctx *alter_ctx)
++ Alter_table_ctx *alter_ctx,
++ bool *partial_alter, char *send_query,
++ start_alter_info *info)
+ {
+ Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN | MYSQL_OPEN_IGNORE_KILLED);
+ handlerton *db_type= table->s->db_type();
+@@ -7670,6 +7675,9 @@ static bool mysql_inplace_alter_table(THD *thd,
+ // It's now safe to take the table level lock.
+ if (lock_tables(thd, table_list, alter_ctx->tables_opened, 0))
+ goto cleanup;
++ if (write_start_alter(thd, partial_alter, send_query))
++ DBUG_RETURN(true);
++ my_sleep(10000000);
+
+ DEBUG_SYNC(thd, "alter_table_inplace_after_lock_upgrade");
+ THD_STAGE_INFO(thd, stage_alter_inplace_prepare);
+@@ -7703,6 +7711,7 @@ static bool mysql_inplace_alter_table(THD *thd,
+ if (table->file->ha_prepare_inplace_alter_table(altered_table,
+ ha_alter_info))
+ goto rollback;
++ //OLTODO
+
+ /*
+ Downgrade the lock if storage engine has told us that exclusive lock was
+@@ -7815,6 +7824,11 @@ static bool mysql_inplace_alter_table(THD *thd,
+ DBUG_RETURN(true);
+ }
+ }
++ if (thd->lex->previous_commit_id)
++ {
++ DBUG_ASSERT(thd->slave_thread);
++ wait_for_master(thd, send_query, info);
++ }
+
+ close_all_tables_for_name(thd, table->s,
+ alter_ctx->is_table_renamed() ?
+@@ -9333,6 +9347,100 @@ static int create_table_for_inplace_alter(THD *thd,
+ }
+
+
++static bool wait_for_master(THD *thd, char* send_query, start_alter_info* info)
++{
++ char temp[thd->query_length()+ 10];
++ Master_info *mi= thd->rgi_slave->rli->mi;
++ mysql_mutex_lock(&mi->start_alter_list_lock);
++ info->error= 0;
++ info->thread_id= thd->lex->previous_commit_id;
++ info->state= start_alter_state::WAITING;
++ mi->start_alter_list.push_back(info, thd->mem_root);
++ mysql_mutex_unlock(&mi->start_alter_list_lock);
++ mysql_cond_broadcast(&mi->start_alter_list_cond);
++ sql_print_information("Setiya alter list Added into list === previous_commit_id %d ",
++ thd->lex->previous_commit_id);
++ sql_print_information("Setiya list Elements %d", mi->start_alter_list.elements);
++ strcpy(temp, thd->query());
++ char* alter_location= strcasestr(temp, "ALTER");
++ //issue here
++// thd->rgi_slave->mark_start_commit();
++// thd->wakeup_subsequent_commits(0);
++// thd->transaction.stmt.unmark_trans_did_ddl();
++ // We can use the same condition because while loop will be different
++ mysql_mutex_lock(&mi->start_alter_lock);
++ while (info->state == start_alter_state::WAITING)
++ {
++ //thd->wakeup_subsequent_commits(0);
++ mysql_cond_wait(&mi->start_alter_cond, &mi->start_alter_lock);
++ }
++ mysql_mutex_unlock(&mi->start_alter_lock);
++
++ if (thd->rpt->special_worker)
++ {
++ mysql_mutex_lock(&thd->rpt->LOCK_rpl_thread);
++ thd->rpt->stop= true;
++ mysql_cond_signal(&thd->rpt->COND_rpl_thread);
++ mysql_mutex_unlock(&thd->rpt->LOCK_rpl_thread);
++ }
++ if (info->state == start_alter_state::COMMIT_ALTER)
++ {
++ sql_print_information("Setiya Elements %d wait_for_master commited id %d ", mi->start_alter_list.elements,
++ info->thread_id);
++
++// thd->transaction.stmt.mark_trans_did_ddl();
++ thd->variables.gtid_seq_no= info->seq_no;
++ sprintf(send_query, "/*!100001 COMMIT %d */ %s", info->thread_id, alter_location);
++ return false;
++ }
++ else
++ {
++ assert(info->state == start_alter_state::ROLLBACK_ALTER);
++ sprintf(send_query, "/*!100001 ROLLBACK %d */ %s", info->thread_id, alter_location);
++ thd->variables.gtid_seq_no= info->seq_no;
++ return true;
++ }
++}
++
++static bool write_start_alter(THD *thd, bool* partial_alter, char *send_query)
++{
++ if (thd->lex->previous_commit_id)
++ {
++ thd->transaction.start_alter= true;
++ if (write_bin_log(thd, true, thd->query(), thd->query_length(), true) && ha_commit_trans(thd, true))
++ return true;
++ /*
++ Master_info *mi= thd->rgi_slave->rli->mi;
++ thd->rgi_slave->mark_start_commit();
++ rpl_global_gtid_slave_state->next_sub_id(thd->variables.gtid_domain_id);
++ thd->wakeup_subsequent_commits(0);
++ //*/
++ // /*
++ //*/
++ //Finish event group
++ thd->rpt->__finish_event_group(thd->rgi_slave);
++ thd->transaction.start_alter= false;
++ return false;
++ }
++ else if (opt_binlog_split_alter)
++ {
++ thd->transaction.start_alter= true;
++ sprintf(send_query, "/*!100001 START %lld %s */",thd->thread_id, thd->query());
++ // thd->transaction.stmt.unmark_trans_did_ddl();
++ // thd->rgi_slave->mark_start_commit();
++ // thd->wakeup_subsequent_commits(0);
++ if (write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
++ return true;
++ *partial_alter= true;
++ // thd->transaction.stmt.mark_trans_did_ddl();
++ // thd->rgi_slave->mark_start_commit();
++ // thd->wakeup_subsequent_commits(0);
++ thd->transaction.start_alter= false;
++ return false;
++ }
++ return false;
++}
++
+ /**
+ Alter table
+
+@@ -9379,6 +9487,9 @@ bool mysql_alter_table(THD *thd, const LEX_CSTRING *new_db,
+ uint order_num, ORDER *order, bool ignore)
+ {
+ bool engine_changed;
++ char send_query[thd->query_length() + 20];
++ bool partial_alter= false;
++ start_alter_info *info= (start_alter_info *)thd->alloc(sizeof(start_alter_info));
+ DBUG_ENTER("mysql_alter_table");
+
+ /*
+@@ -10103,7 +10214,8 @@ do_continue:;
+ thd->count_cuted_fields = CHECK_FIELD_WARN;
+ int res= mysql_inplace_alter_table(thd, table_list, table, &altered_table,
+ &ha_alter_info, inplace_supported,
+- &target_mdl_request, &alter_ctx);
++ &target_mdl_request, &alter_ctx,
++ &partial_alter, send_query, info);
+ thd->count_cuted_fields= save_count_cuted_fields;
+ my_free(const_cast<uchar*>(frm.str));
+
+@@ -10161,7 +10273,10 @@ do_continue:;
+ if (lock_tables(thd, table_list, alter_ctx.tables_opened,
+ MYSQL_LOCK_USE_MALLOC))
+ goto err_new_table_cleanup;
+-
++ //If issues by binlog/master complete the prepare phase of alter and then commit
++ if (write_start_alter(thd, &partial_alter ,send_query))
++ DBUG_RETURN(true);
++ my_sleep(10000000);
+ if (ha_create_table(thd, alter_ctx.get_tmp_path(),
+ alter_ctx.new_db.str, alter_ctx.new_name.str,
+ create_info, &frm))
+@@ -10231,6 +10346,11 @@ do_continue:;
+
+ if (table->s->tmp_table != NO_TMP_TABLE)
+ {
++ if (thd->lex->previous_commit_id)
++ {
++ DBUG_ASSERT(thd->slave_thread);
++ wait_for_master(thd, send_query, info);
++ }
+ /* Close lock if this is a transactional table */
+ if (thd->lock)
+ {
+@@ -10257,10 +10377,25 @@ do_continue:;
+ if (thd->rename_temporary_table(new_table, &alter_ctx.new_db,
+ &alter_ctx.new_name))
+ goto err_new_table_cleanup;
++ if (partial_alter)
++ {
++ sprintf(send_query, "/*!100001 COMMIT %lld */ %s", thd->thread_id, thd->query());
++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
++ DBUG_RETURN(true);
++ }
++ else if(thd->lex->previous_commit_id)
++ {
++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
++ DBUG_RETURN(true);
++ info->state= start_alter_state::COMMITTED_ALTER;
++ Master_info *mi= thd->rgi_slave->rli->mi;
++ mysql_cond_broadcast(&mi->start_alter_cond);
++ }
+ /* We don't replicate alter table statement on temporary tables */
+- if (!thd->is_current_stmt_binlog_format_row() &&
++ else if (!thd->is_current_stmt_binlog_format_row() &&
+ write_bin_log(thd, true, thd->query(), thd->query_length()))
+ DBUG_RETURN(true);
++
+ my_free(const_cast<uchar*>(frm.str));
+ goto end_temporary;
+ }
+@@ -10275,6 +10410,11 @@ do_continue:;
+ - Neither old or new engine uses files from another engine
+ The above is mainly true for the sequence and the partition engine.
+ */
++ if (thd->lex->previous_commit_id)
++ {
++ DBUG_ASSERT(thd->slave_thread);
++ wait_for_master(thd, send_query, info);
++ }
+ engine_changed= ((new_table->file->ht != table->file->ht) &&
+ (((!(new_table->file->ha_table_flags() & HA_FILE_BASED) ||
+ !(table->file->ha_table_flags() & HA_FILE_BASED))) ||
+@@ -10441,7 +10581,21 @@ do_continue:;
+ DBUG_ASSERT(!(mysql_bin_log.is_open() &&
+ thd->is_current_stmt_binlog_format_row() &&
+ (create_info->tmp_table())));
+- if (write_bin_log(thd, true, thd->query(), thd->query_length()))
++ if (partial_alter)
++ {
++ sprintf(send_query, "/*!100001 COMMIT %lld */ %s", thd->thread_id, thd->query());
++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
++ DBUG_RETURN(true);
++ }
++ else if(thd->lex->previous_commit_id)
++ {
++ if(write_bin_log(thd, FALSE, send_query, strlen(send_query), true))
++ DBUG_RETURN(true);
++ info->state= start_alter_state::COMMITTED_ALTER;
++ Master_info *mi= thd->rgi_slave->rli->mi;
++ mysql_cond_broadcast(&mi->start_alter_cond);
++ }
++ else if (write_bin_log(thd, true, thd->query(), thd->query_length()))
+ DBUG_RETURN(true);
+
+ table_list->table= NULL; // For query cache
+@@ -10504,6 +10658,8 @@ do_continue:;
+ expects that error is set
+ */
+ write_bin_log(thd, FALSE, thd->query(), thd->query_length());
++//OLTODO
++//Commit with error
+
+ err_with_mdl:
+ /*
+diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
+index 05e68f3b752..a1fce8396e1 100644
+--- a/sql/sql_yacc.yy
++++ b/sql/sql_yacc.yy
+@@ -2141,6 +2141,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
+ opt_delete_gtid_domain
+ asrow_attribute
+ opt_constraint_no_id
++ start_cmnd
+
+ %type <NONE> call sp_proc_stmts sp_proc_stmts1 sp_proc_stmt
+ %type <NONE> sp_proc_stmt_statement sp_proc_stmt_return
+@@ -2299,7 +2300,7 @@ verb_clause:
+
+ /* Verb clauses, except begin and compound_statement */
+ statement:
+- alter
++ start_cmnd alter
+ | analyze
+ | analyze_stmt_command
+ | backup
+@@ -7802,6 +7803,19 @@ string_list:
+ { Lex->last_field->interval_list.push_back($3, thd->mem_root); }
+ ;
+
++start_cmnd:
++ /* empty*/ {}
++ | START_SYM
++ {
++ if (thd->variables.pseudo_thread_id)
++ Lex->previous_commit_id= thd->variables.pseudo_thread_id;
++ }
++ | START_SYM ulonglong_num
++ {
++ Lex->previous_commit_id= $2;
++ }
++ ;
++
+ /*
+ ** Alter table
+ */
+@@ -17981,6 +17995,19 @@ commit:
+ lex->tx_chain= $3;
+ lex->tx_release= $4;
+ }
++ | COMMIT_SYM alter
++ {
++ LEX *lex=Lex;
++ lex->sql_command= SQLCOM_COMMIT_PREVIOUS;
++ if (thd->variables.pseudo_thread_id)
++ lex->previous_commit_id= thd->variables.pseudo_thread_id;
++ }
++ | COMMIT_SYM ulonglong_num alter
++ {
++ LEX *lex=Lex;
++ lex->sql_command= SQLCOM_COMMIT_PREVIOUS;
++ lex->previous_commit_id= $2;
++ }
+ ;
+
+ rollback:
+@@ -18005,6 +18032,19 @@ rollback:
+ lex->sql_command= SQLCOM_ROLLBACK_TO_SAVEPOINT;
+ lex->ident= $4;
+ }
++ | ROLLBACK_SYM ALTER
++ {
++ LEX *lex=Lex;
++ lex->sql_command= SQLCOM_ROLLBACK_PREVIOUS;
++ if (thd->variables.pseudo_thread_id)
++ lex->previous_commit_id= thd->variables.pseudo_thread_id;
++ }
++ | ROLLBACK_SYM ulonglong_num ALTER
++ {
++ LEX *lex=Lex;
++ lex->sql_command= SQLCOM_ROLLBACK_PREVIOUS;
++ lex->previous_commit_id= $2;
++ }
+ ;
+
+ savepoint:
+diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
+index ff92b042ccc..e72fca75f63 100644
+--- a/sql/sys_vars.cc
++++ b/sql/sys_vars.cc
+@@ -2234,6 +2234,11 @@ static Sys_var_bit Sys_skip_parallel_replication(
+ SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_SKIP_PARALLEL,
+ DEFAULT(FALSE));
+
++static Sys_var_mybool Sys_binlog_split_alter(
++ "binlog_split_alter",
++ "If set split the alter into 2 statement START ALTER and COMMIT/ROLLBACK"
++ "ALTER",
++ GLOBAL_VAR(opt_binlog_split_alter), NO_CMD_LINE, DEFAULT(FALSE));
+
+ static bool
+ check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)