diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-06-28 15:19:30 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-06-28 15:19:30 +0200 |
commit | 1b3dc66e3117a09c95a00be2f649b975fdb25e2e (patch) | |
tree | db25dcfc0af5b9cccda27ee022dd8bd7acee0ce2 /sql | |
parent | 7e5dc4f074b7d1cee4721e6fa49d6e5628ef793f (diff) | |
download | mariadb-git-1b3dc66e3117a09c95a00be2f649b975fdb25e2e.tar.gz |
MDEV-4506: Parallel replication: Intermediate commit.
First step of splitting out part of Relay_log_info, so that different
event groups being applied in parallel can each use their own copy.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 88 | ||||
-rw-r--r-- | sql/log_event.h | 52 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 9 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 10 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 2 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 4 | ||||
-rw-r--r-- | sql/rpl_rli.h | 31 | ||||
-rw-r--r-- | sql/rpl_utility.cc | 5 | ||||
-rw-r--r-- | sql/rpl_utility.h | 2 | ||||
-rw-r--r-- | sql/slave.cc | 30 | ||||
-rw-r--r-- | sql/slave.h | 3 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 14 |
12 files changed, 161 insertions, 89 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 431f8b47f2d..8bbc43dec35 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3755,9 +3755,9 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Query_log_event::do_apply_event(Relay_log_info const *rli) +int Query_log_event::do_apply_event(struct rpl_group_info *rgi) { - return do_apply_event(rli, query, q_len); + return do_apply_event(rgi, query, q_len); } /** @@ -3806,7 +3806,7 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error) mismatch. This mismatch could be implemented with a new ER_ code, and to ignore it you would use --slave-skip-errors... */ -int Query_log_event::do_apply_event(Relay_log_info const *rli, +int Query_log_event::do_apply_event(struct rpl_group_info *rgi, const char *query_arg, uint32 q_len_arg) { LEX_STRING new_db; @@ -3814,6 +3814,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, HA_CREATE_INFO db_options; uint64 sub_id= 0; rpl_gtid gtid; + Relay_log_info const *rli= rgi->rli; Rpl_filter *rpl_filter= rli->mi->rpl_filter; DBUG_ENTER("Query_log_event::do_apply_event"); @@ -4006,12 +4007,12 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, Record any GTID in the same transaction, so slave state is transactionally consistent. */ - if (strcmp("COMMIT", query) == 0 && (sub_id= rli->gtid_sub_id)) + if (strcmp("COMMIT", query) == 0 && (sub_id= rgi->gtid_sub_id)) { /* Clear the GTID from the RLI so we don't accidentally reuse it. */ - const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + rgi->gtid_sub_id= 0; - gtid= rli->current_gtid; + gtid= rgi->current_gtid; if (rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true, false)) { rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, @@ -4458,10 +4459,12 @@ bool Start_log_event_v3::write(IO_CACHE* file) other words, no deadlock problem. */ -int Start_log_event_v3::do_apply_event(Relay_log_info const *rli) +int Start_log_event_v3::do_apply_event(struct rpl_group_info *rgi) { DBUG_ENTER("Start_log_event_v3::do_apply_event"); int error= 0; + Relay_log_info const *rli= rgi->rli; + switch (binlog_version) { case 3: @@ -4805,9 +4808,10 @@ bool Format_description_log_event::write(IO_CACHE* file) #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Format_description_log_event::do_apply_event(Relay_log_info const *rli) +int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi) { int ret= 0; + Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Format_description_log_event::do_apply_event"); /* @@ -4848,7 +4852,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) 0, then 96, then jump to first really asked event (which is >96). So this is ok. */ - ret= Start_log_event_v3::do_apply_event(rli); + ret= Start_log_event_v3::do_apply_event(rgi); } if (!ret) @@ -5509,10 +5513,11 @@ void Load_log_event::set_fields(const char* affected_db, 1 Failure */ -int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, +int Load_log_event::do_apply_event(NET* net, struct rpl_group_info *rgi, bool use_rli_only_for_errors) { LEX_STRING new_db; + Relay_log_info const *rli= rgi->rli; Rpl_filter *rpl_filter= rli->mi->rpl_filter; DBUG_ENTER("Load_log_event::do_apply_event"); @@ -5776,7 +5781,7 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", DBUG_RETURN(1); } - DBUG_RETURN( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) ); + DBUG_RETURN( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rgi) ); } #endif @@ -6245,7 +6250,7 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol) static char gtid_begin_string[] = "BEGIN"; int -Gtid_log_event::do_apply_event(Relay_log_info const *rli) +Gtid_log_event::do_apply_event(struct rpl_group_info *rgi) { thd->variables.server_id= this->server_id; thd->variables.gtid_domain_id= this->domain_id; @@ -6467,9 +6472,10 @@ Gtid_list_log_event::write(IO_CACHE *file) int -Gtid_list_log_event::do_apply_event(Relay_log_info const *rli) +Gtid_list_log_event::do_apply_event(struct rpl_group_info *rgi) { - int ret= Log_event::do_apply_event(rli); + Relay_log_info const *rli= rgi->rli; + int ret= Log_event::do_apply_event(rgi); if (rli->until_condition == Relay_log_info::UNTIL_GTID && (gl_flags & FLAG_UNTIL_REACHED)) { @@ -6696,13 +6702,14 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) Intvar_log_event::do_apply_event() */ -int Intvar_log_event::do_apply_event(Relay_log_info const *rli) +int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi) { + Relay_log_info *rli= rgi->rli; /* We are now in a statement until the associated query log event has been processed. */ - const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); + rli->set_flag(Relay_log_info::IN_STMT); if (rli->deferred_events_collecting) return rli->deferred_events->add(this); @@ -6805,8 +6812,9 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Rand_log_event::do_apply_event(Relay_log_info const *rli) +int Rand_log_event::do_apply_event(struct rpl_group_info *rgi) { + Relay_log_info const *rli= rgi->rli; /* We are now in a statement until the associated query log event has been processed. @@ -6860,7 +6868,7 @@ bool slave_execute_deferred_events(THD *thd) if (!rli->deferred_events_collecting || rli->deferred_events->is_empty()) return res; - res= rli->deferred_events->execute(rli); + res= rli->deferred_events->execute(rli->group_info); return res; } @@ -6935,23 +6943,24 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Xid_log_event::do_apply_event(Relay_log_info const *rli) +int Xid_log_event::do_apply_event(struct rpl_group_info *rgi) { bool res; int err; rpl_gtid gtid; uint64 sub_id; + Relay_log_info const *rli= rgi->rli; /* Record any GTID in the same transaction, so slave state is transactionally consistent. */ - if ((sub_id= rli->gtid_sub_id)) + if ((sub_id= rgi->gtid_sub_id)) { /* Clear the GTID from the RLI so we don't accidentally reuse it. */ - const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + rgi->gtid_sub_id= 0; - gtid= rli->current_gtid; + gtid= rgi->current_gtid; err= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true, false); if (err) { @@ -7400,10 +7409,11 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int User_var_log_event::do_apply_event(Relay_log_info const *rli) +int User_var_log_event::do_apply_event(struct rpl_group_info *rgi) { Item *it= 0; CHARSET_INFO *charset; + Relay_log_info const *rli= rgi->rli; DBUG_ENTER("User_var_log_event::do_apply_event"); if (rli->deferred_events_collecting) @@ -7664,7 +7674,7 @@ Slave_log_event::Slave_log_event(const char* buf, #ifndef MYSQL_CLIENT -int Slave_log_event::do_apply_event(Relay_log_info const *rli) +int Slave_log_event::do_apply_event(struct rpl_group_info *rgi) { if (mysql_bin_log.is_open()) return mysql_bin_log.write(this); @@ -7939,13 +7949,14 @@ void Create_file_log_event::pack_info(THD *thd, Protocol *protocol) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Create_file_log_event::do_apply_event(Relay_log_info const *rli) +int Create_file_log_event::do_apply_event(struct rpl_group_info *rgi) { char proc_info[17+FN_REFLEN+10], *fname_buf; char *ext; int fd = -1; IO_CACHE file; int error = 1; + Relay_log_info const *rli= rgi->rli; bzero((char*)&file, sizeof(file)); fname_buf= strmov(proc_info, "Making temp file "); @@ -8120,11 +8131,12 @@ int Append_block_log_event::get_create_or_append() const Append_block_log_event::do_apply_event() */ -int Append_block_log_event::do_apply_event(Relay_log_info const *rli) +int Append_block_log_event::do_apply_event(struct rpl_group_info *rgi) { char proc_info[17+FN_REFLEN+10], *fname= proc_info+17; int fd; int error = 1; + Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Append_block_log_event::do_apply_event"); fname= strmov(proc_info, "Making temp file "); @@ -8270,9 +8282,10 @@ void Delete_file_log_event::pack_info(THD *thd, Protocol *protocol) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Delete_file_log_event::do_apply_event(Relay_log_info const *rli) +int Delete_file_log_event::do_apply_event(struct rpl_group_info *rgi) { char fname[FN_REFLEN+10]; + Relay_log_info const *rli= rgi->rli; char *ext= slave_load_file_stem(fname, file_id, server_id, ".data", &rli->mi->cmp_connection_name); mysql_file_delete(key_file_log_event_data, fname, MYF(MY_WME)); @@ -8369,7 +8382,7 @@ void Execute_load_log_event::pack_info(THD *thd, Protocol *protocol) Execute_load_log_event::do_apply_event() */ -int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) +int Execute_load_log_event::do_apply_event(struct rpl_group_info *rgi) { char fname[FN_REFLEN+10]; char *ext; @@ -8377,6 +8390,7 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) int error= 1; IO_CACHE file; Load_log_event *lev= 0; + Relay_log_info const *rli= rgi->rli; ext= slave_load_file_stem(fname, file_id, server_id, ".info", &rli->mi->cmp_connection_name); @@ -8412,7 +8426,7 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli) */ const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos; - if (lev->do_apply_event(0,rli,1)) + if (lev->do_apply_event(0,rgi,1)) { /* We want to indicate the name of the file that could not be loaded @@ -8641,13 +8655,14 @@ void Execute_load_query_log_event::pack_info(THD *thd, Protocol *protocol) int -Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli) +Execute_load_query_log_event::do_apply_event(struct rpl_group_info *rgi) { char *p; char *buf; char *fname; char *fname_end; int error; + Relay_log_info const *rli= rgi->rli; buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) + (FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME)); @@ -8684,7 +8699,7 @@ Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli) p= strmake(p, STRING_WITH_LEN(" INTO ")); p= strmake(p, query+fn_pos_end, q_len-fn_pos_end); - error= Query_log_event::do_apply_event(rli, buf, p-buf); + error= Query_log_event::do_apply_event(rgi, buf, p-buf); /* Forging file name for deletion in same buffer */ *fname_end= 0; @@ -9048,8 +9063,9 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length) #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Rows_log_event::do_apply_event(Relay_log_info const *rli) +int Rows_log_event::do_apply_event(struct rpl_group_info *rgi) { + Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; /* @@ -9751,7 +9767,7 @@ void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo) #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli) +int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi) { m_save_thd_query_txt= thd->query(); m_save_thd_query_len= thd->query_length(); @@ -10269,13 +10285,14 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list) DBUG_RETURN(res); } -int Table_map_log_event::do_apply_event(Relay_log_info const *rli) +int Table_map_log_event::do_apply_event(struct rpl_group_info *rgi) { RPL_TABLE_LIST *table_list; char *db_mem, *tname_mem; size_t dummy_len; void *memory; Rpl_filter *filter; + Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)"); DBUG_ASSERT(rli->sql_thd == thd); @@ -11818,8 +11835,9 @@ Incident_log_event::print(FILE *file, #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) int -Incident_log_event::do_apply_event(Relay_log_info const *rli) +Incident_log_event::do_apply_event(struct rpl_group_info *rgi) { + Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Incident_log_event::do_apply_event"); rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT, ER(ER_SLAVE_INCIDENT), diff --git a/sql/log_event.h b/sql/log_event.h index 641ab3e37b7..8a60296695b 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1317,9 +1317,9 @@ public: @see do_apply_event */ - int apply_event(Relay_log_info const *rli) + int apply_event(struct rpl_group_info *rgi) { - return do_apply_event(rli); + return do_apply_event(rgi); } @@ -1412,7 +1412,7 @@ protected: @retval 0 Event applied successfully @retval errno Error code if event application failed */ - virtual int do_apply_event(Relay_log_info const *rli) + virtual int do_apply_event(struct rpl_group_info *rgi) { return 0; /* Default implementation does nothing */ } @@ -1966,10 +1966,10 @@ public: public: /* !!! Public in this patch to allow old usage */ #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); - int do_apply_event(Relay_log_info const *rli, + int do_apply_event(struct rpl_group_info *rgi, const char *query_arg, uint32 q_len_arg); static bool peek_is_commit_rollback(const char *event_start, @@ -2083,7 +2083,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const* rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif }; @@ -2396,12 +2396,12 @@ public: public: /* !!! Public in this patch to allow old usage */ #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const* rli) + virtual int do_apply_event(struct rpl_group_info *rgi) { - return do_apply_event(thd->slave_net,rli,0); + return do_apply_event(thd->slave_net,rgi,0); } - int do_apply_event(NET *net, Relay_log_info const *rli, + int do_apply_event(NET *net, struct rpl_group_info *rgi, bool use_rli_only_for_errors); #endif }; @@ -2480,7 +2480,7 @@ public: protected: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual enum_skip_reason do_shall_skip(Relay_log_info*) { /* @@ -2576,7 +2576,7 @@ public: static bool is_version_before_checksum(const master_version_split *version_split); protected: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -2655,7 +2655,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -2734,7 +2734,7 @@ class Rand_log_event: public Log_event private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -2783,7 +2783,7 @@ class Xid_log_event: public Log_event private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif }; @@ -2850,7 +2850,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -3099,7 +3099,7 @@ public: uint16 flags, bool is_transactional, uint64 commit_id); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -3229,7 +3229,7 @@ public: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) bool to_packet(String *packet); bool write(IO_CACHE *file); - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif static bool peek(const char *event_start, uint32 event_len, uint8 checksum_alg, @@ -3308,7 +3308,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif }; @@ -3363,7 +3363,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif }; @@ -3404,7 +3404,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif }; @@ -3444,7 +3444,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif }; @@ -3543,7 +3543,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif }; @@ -3615,7 +3615,7 @@ public: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) private: - virtual int do_apply_event(Relay_log_info const*); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info*); virtual enum_skip_reason do_shall_skip(Relay_log_info*); #endif @@ -4030,7 +4030,7 @@ public: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); #endif @@ -4258,7 +4258,7 @@ protected: private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); virtual int do_update_pos(Relay_log_info *rli); virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); @@ -4592,7 +4592,7 @@ public: #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_apply_event(struct rpl_group_info *rgi); #endif virtual bool write_data_header(IO_CACHE *file); diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 71b18e64842..54d3b704a2c 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -65,17 +65,18 @@ int rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) { uint64 sub_id; + struct rpl_group_info *rgi; /* Update the GTID position, if we have it and did not already update it in a GTID transaction. */ - if ((sub_id= rli->gtid_sub_id)) + if ((rgi= rli->group_info) && (sub_id= rgi->gtid_sub_id)) { - rli->gtid_sub_id= 0; - if (record_gtid(thd, &rli->current_gtid, sub_id, false, false)) + rgi->gtid_sub_id= 0; + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) return 1; - update_state_hash(sub_id, &rli->current_gtid); + update_state_hash(sub_id, &rgi->current_gtid); } return 0; } diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index f1ac7e83071..8ea4799e94a 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -13,15 +13,18 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, struct rpl_parallel_thread *rpt) { int err; - Relay_log_info *rli= qev->rli; + struct rpl_group_info *rgi= qev->rgi; + Relay_log_info *rli= rgi->rli; thd->rli_slave= rli; thd->rpl_filter = rli->mi->rpl_filter; /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); - err= apply_event_and_update_pos(qev->ev, thd, rli, rpt); + err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); /* ToDo: error handling. */ /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */ + my_free(rgi); + rgi= NULL; } @@ -398,7 +401,8 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd) return true; } qev->ev= ev; - qev->rli= rli; + qev->rgi= rli->group_info; + rli->group_info= NULL; /* Avoid conflict with groups applied in parallel */ qev->next= NULL; if (ev->get_type_code() == GTID_EVENT) diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 7e966f1615c..c5bb39cb6fc 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -23,7 +23,7 @@ struct rpl_parallel_thread { struct queued_event { queued_event *next; Log_event *ev; - Relay_log_info *rli; + struct rpl_group_info *rgi; } *event_queue, *last_in_queue; rpl_parallel_thread *wait_for; /* ToDo: change this ... */ }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 12c38f95575..5d5bca1189c 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -59,7 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) abort_pos_wait(0), slave_run_id(0), sql_thd(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), + group_info(0), tables_to_lock(0), tables_to_lock_count(0), last_event_start_time(0), deferred_events(NULL),m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), m_annotate_event(0) @@ -113,6 +113,8 @@ Relay_log_info::~Relay_log_info() mysql_cond_destroy(&sleep_cond); relay_log.cleanup(); free_annotate_event(); + if (group_info) + my_free(group_info); DBUG_VOID_RETURN; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 452457e9e5a..c02ae6e3adb 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -53,6 +53,8 @@ class Master_info; *****************************************************************************/ +struct rpl_group_info; + class Relay_log_info : public Slave_reporting_capability { public: @@ -312,13 +314,8 @@ public: char slave_patternload_file[FN_REFLEN]; size_t slave_patternload_file_size; - /* - Current GTID being processed. - The sub_id gives the binlog order within one domain_id. A zero sub_id - means that there is no active GTID. - */ - uint64 gtid_sub_id; - rpl_gtid current_gtid; + /* Various data related to the currently executing event group. */ + struct rpl_group_info *group_info; rpl_parallel parallel; Relay_log_info(bool is_slave_recovery); @@ -596,6 +593,26 @@ private: }; +/* + This is data for various state needed to be kept for the processing of + one event group in the SQL thread. + + For single-threaded replication it is linked from the RLI, for parallel + replication it is linked into each event group being executed in parallel. +*/ +struct rpl_group_info +{ + Relay_log_info *rli; + /* + Current GTID being processed. + The sub_id gives the binlog order within one domain_id. A zero sub_id + means that there is no active GTID. + */ + uint64 gtid_sub_id; + rpl_gtid current_gtid; +}; + + // Defined in rpl_rli.cc int init_relay_log_info(Relay_log_info* rli, const char* info_fname); diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index 6bbe998a624..cce8ef99fef 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -1143,9 +1143,10 @@ bool Deferred_log_events::is_empty() return array.elements == 0; } -bool Deferred_log_events::execute(Relay_log_info *rli) +bool Deferred_log_events::execute(struct rpl_group_info *rgi) { bool res= false; + Relay_log_info *rli= rgi->rli; DBUG_ASSERT(rli->deferred_events_collecting); @@ -1154,7 +1155,7 @@ bool Deferred_log_events::execute(Relay_log_info *rli) { Log_event *ev= (* (Log_event **) dynamic_array_ptr(&array, i)); - res= ev->apply_event(rli); + res= ev->apply_event(rgi); } rli->deferred_events_collecting= true; return res; diff --git a/sql/rpl_utility.h b/sql/rpl_utility.h index 79f4517c492..893cc8d3e04 100644 --- a/sql/rpl_utility.h +++ b/sql/rpl_utility.h @@ -275,7 +275,7 @@ public: /* queue for exection at Query-log-event time prior the Query */ int add(Log_event *ev); bool is_empty(); - bool execute(Relay_log_info *rli); + bool execute(struct rpl_group_info *rgi); void rewind(); bool is_last(Log_event *ev) { return ev == last_added; }; }; diff --git a/sql/slave.cc b/sql/slave.cc index d7e4d9a25ed..ace5c7f837b 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3018,10 +3018,12 @@ static int has_temporary_error(THD *thd) @retval 2 No error calling ev->apply_event(), but error calling ev->update_pos(). */ -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, +int apply_event_and_update_pos(Log_event* ev, THD* thd, + struct rpl_group_info *rgi, rpl_parallel_thread *rpt) { int exec_res= 0; + Relay_log_info* rli= rgi->rli; DBUG_ENTER("apply_event_and_update_pos"); @@ -3080,7 +3082,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, } mysql_mutex_unlock(&rli->data_lock); if (reason == Log_event::EVENT_SKIP_NOT) - exec_res= ev->apply_event(rli); + exec_res= ev->apply_event(rgi); #ifndef DBUG_OFF /* @@ -3244,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) if (opt_slave_parallel_threads > 0) DBUG_RETURN(rli->parallel.do_event(rli, ev, thd)); - exec_res= apply_event_and_update_pos(ev, thd, rli, NULL); + exec_res= apply_event_and_update_pos(ev, thd, rli->group_info, NULL); switch (ev->get_type_code()) { case FORMAT_DESCRIPTION_EVENT: @@ -5734,6 +5736,7 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_t *log_lock = rli->relay_log.get_log_lock(); const char* errmsg=0; THD* thd = rli->sql_thd; + struct rpl_group_info *rgi; DBUG_ENTER("next_event"); DBUG_ASSERT(thd != 0); @@ -5821,6 +5824,19 @@ static Log_event* next_event(Relay_log_info* rli) opt_slave_sql_verify_checksum))) { + if (!(rgi= rli->group_info)) + { + if (!(rgi= rli->group_info= (struct rpl_group_info *) + my_malloc(sizeof(*rgi), MYF(0)))) + { + errmsg = "slave SQL thread aborted because of out-of-memory error"; + if (hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + bzero(rgi, sizeof(*rgi)); + } + rgi->rli= rli; DBUG_ASSERT(thd==rli->sql_thd); /* read it while we have a lock, to avoid a mutex lock in @@ -5842,10 +5858,10 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_unlock(log_lock); goto err; } - rli->gtid_sub_id= sub_id; - rli->current_gtid.server_id= gev->server_id; - rli->current_gtid.domain_id= gev->domain_id; - rli->current_gtid.seq_no= gev->seq_no; + rgi->gtid_sub_id= sub_id; + rgi->current_gtid.server_id= gev->server_id; + rgi->current_gtid.domain_id= gev->domain_id; + rgi->current_gtid.seq_no= gev->seq_no; } if (hot_log) diff --git a/sql/slave.h b/sql/slave.h index 69b0e011a39..4e64754a877 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -228,7 +228,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, void set_slave_thread_options(THD* thd); void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli); int rotate_relay_log(Master_info* mi); -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, +int apply_event_and_update_pos(Log_event* ev, THD* thd, + struct rpl_group_info *rgi, rpl_parallel_thread *rpt); pthread_handler_t handle_slave_io(void *arg); diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 3bb5deab406..bef9a4c3475 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -44,6 +44,7 @@ void mysql_client_binlog_statement(THD* thd) { + struct rpl_group_info *rgi; DBUG_ENTER("mysql_client_binlog_statement"); DBUG_PRINT("info",("binlog base64: '%*s'", (int) (thd->lex->comment.length < 2048 ? @@ -196,6 +197,17 @@ void mysql_client_binlog_statement(THD* thd) } } + if (!(rgi= rli->group_info)) + { + if (!(rgi= rli->group_info= (struct rpl_group_info *) + my_malloc(sizeof(*rgi), MYF(0)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rgi)); + goto end; + } + bzero(rgi, sizeof(*rgi)); + } + rgi->rli= rli; ev= Log_event::read_log_event(bufptr, event_len, &error, rli->relay_log.description_event_for_exec, 0); @@ -232,7 +244,7 @@ void mysql_client_binlog_statement(THD* thd) (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); - err= ev->apply_event(rli); + err= ev->apply_event(rgi); thd->variables.option_bits= (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | |