diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 164 |
1 files changed, 161 insertions, 3 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 27dddb69144..c19faf21647 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -44,7 +44,7 @@ #include <mysql/psi/mysql_statement.h> #include <strfunc.h> #include "compat56.h" - +#include "wsrep_mysqld.h" #endif /* MYSQL_CLIENT */ #include <base64.h> @@ -3092,6 +3092,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, { time_t end_time; +#ifdef WITH_WSREP + /* + If Query_log_event will contain non trans keyword (not BEGIN, COMMIT, + SAVEPOINT or ROLLBACK) we disable PA for this transaction. + */ + if (WSREP_ON && !is_trans_keyword()) + thd->wsrep_PA_safe= false; +#endif /* WITH_WSREP */ + memset(&user, 0, sizeof(user)); memset(&host, 0, sizeof(host)); @@ -4508,6 +4517,22 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi) DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); } } +#ifdef WITH_WSREP + else if (WSREP_ON && wsrep_mysql_replication_bundle && opt_slave_domain_parallel_threads == 0 && + thd->wsrep_mysql_replicated > 0 && + (is_begin() || is_commit())) + { + if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) + { + WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); + DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE); + } + else + { + thd->wsrep_mysql_replicated = 0; + } + } +#endif DBUG_RETURN(Log_event::do_shall_skip(rgi)); } @@ -7337,6 +7362,21 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi) thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN); DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); } +#ifdef WITH_WSREP + else if (wsrep_mysql_replication_bundle && WSREP_ON && + opt_slave_domain_parallel_threads == 0) + { + if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) + { + WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); + DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE); + } + else + { + thd->wsrep_mysql_replicated = 0; + } + } +#endif DBUG_RETURN(Log_event::do_shall_skip(rgi)); } #endif /* !MYSQL_CLIENT */ @@ -9614,6 +9654,18 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0)) { uint actual_error= thd->get_stmt_da()->sql_errno(); +#ifdef WITH_WSREP + if (WSREP(thd)) + { + WSREP_WARN("BF applier failed to open_and_lock_tables: %u, fatal: %d " + "wsrep = (exec_mode: %d conflict_state: %d seqno: %lld)", + thd->get_stmt_da()->sql_errno(), + thd->is_fatal_error, + thd->wsrep_exec_mode, + thd->wsrep_conflict_state, + (long long)wsrep_thd_trx_seqno(thd)); + } +#endif if (thd->is_slave_error || thd->is_fatal_error) { /* @@ -10760,8 +10812,8 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list) DBUG_ENTER("check_table_map"); enum_tbl_map_status res= OK_TO_PROCESS; Relay_log_info *rli= rgi->rli; - - if (rgi->thd->slave_thread /* filtering is for slave only */ && + if ((rgi->thd->slave_thread /* filtering is for slave only */ || + IF_WSREP((WSREP(rgi->thd) && rgi->thd->wsrep_applier), 0)) && (!rli->mi->rpl_filter->db_ok(table_list->db) || (rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list)))) res= FILTERED_OUT; @@ -11509,7 +11561,19 @@ int Write_rows_log_event::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); + const char *tmp= thd->get_proc_info(); + const char *message= "Write_rows_log_event::write_row()"; + +#ifdef WSREP_PROC_INFO + my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "Write_rows_log_event::write_row(%lld)", + (long long) wsrep_thd_trx_seqno(thd)); + message= thd->wsrep_info; +#endif /* WSREP_PROC_INFO */ + + thd_proc_info(thd, message); int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); + thd_proc_info(thd, tmp); if (error && !thd->is_error()) { @@ -12187,15 +12251,34 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) { int error; + const char *tmp= thd->get_proc_info(); + const char *message= "Delete_rows_log_event::find_row()"; const bool invoke_triggers= slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; DBUG_ASSERT(m_table != NULL); +#ifdef WSREP_PROC_INFO + my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "Delete_rows_log_event::find_row(%lld)", + (long long) wsrep_thd_trx_seqno(thd)); + message= thd->wsrep_info; +#endif /* WSREP_PROC_INFO */ + + thd_proc_info(thd, message); if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] */ + message= "Delete_rows_log_event::ha_delete_row()"; +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "Delete_rows_log_event::ha_delete_row(%lld)", + (long long) wsrep_thd_trx_seqno(thd)); + message= thd->wsrep_info; +#endif + thd_proc_info(thd, message); + if (invoke_triggers && process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE)) error= HA_ERR_GENERIC; // in case if error is not set yet @@ -12206,6 +12289,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) error= HA_ERR_GENERIC; // in case if error is not set yet m_table->file->ha_index_or_rnd_end(); } + thd_proc_info(thd, tmp); return error; } @@ -12333,8 +12417,18 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) { const bool invoke_triggers= slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; + const char *tmp= thd->get_proc_info(); + const char *message= "Update_rows_log_event::find_row()"; DBUG_ASSERT(m_table != NULL); +#ifdef WSREP_PROC_INFO + my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "Update_rows_log_event::find_row(%lld)", + (long long) wsrep_thd_trx_seqno(thd)); + message= thd->wsrep_info; +#endif /* WSREP_PROC_INFO */ + + thd_proc_info(thd, message); int error= find_row(rgi); if (error) { @@ -12344,6 +12438,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) */ m_curr_row= m_curr_row_end; unpack_current_row(rgi); + thd_proc_info(thd, tmp); return error; } @@ -12361,7 +12456,16 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; + message= "Update_rows_log_event::unpack_current_row()"; +#ifdef WSREP_PROC_INFO + my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "Update_rows_log_event::unpack_current_row(%lld)", + (long long) wsrep_thd_trx_seqno(thd)); + message= thd->wsrep_info; +#endif /* WSREP_PROC_INFO */ + /* this also updates m_curr_row_end */ + thd_proc_info(thd, message); if ((error= unpack_current_row(rgi))) goto err; @@ -12379,6 +12483,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); #endif + message= "Update_rows_log_event::ha_update_row()"; +#ifdef WSREP_PROC_INFO + my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "Update_rows_log_event::ha_update_row(%lld)", + (long long) wsrep_thd_trx_seqno(thd)); + message= thd->wsrep_info; +#endif /* WSREP_PROC_INFO */ + + thd_proc_info(thd, message); if (invoke_triggers && process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE)) { @@ -12394,6 +12507,8 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE)) error= HA_ERR_GENERIC; // in case if error is not set yet + thd_proc_info(thd, tmp); + err: m_table->file->ha_index_or_rnd_end(); return error; @@ -12483,6 +12598,49 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol) m_incident, description(), m_message.str); protocol->store(buf, bytes, &my_charset_bin); } +#endif /* MYSQL_CLIENT */ + + +#if WITH_WSREP && !defined(MYSQL_CLIENT) +Format_description_log_event *wsrep_format_desc; // TODO: free them at the end +/* + read the first event from (*buf). The size of the (*buf) is (*buf_len). + At the end (*buf) is shitfed to point to the following event or NULL and + (*buf_len) will be changed to account just being read bytes of the 1st event. +*/ +#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max + +Log_event* wsrep_read_log_event( + char **arg_buf, size_t *arg_buf_len, + const Format_description_log_event *description_event) +{ + char *head= (*arg_buf); + uint data_len = uint4korr(head + EVENT_LEN_OFFSET); + char *buf= (*arg_buf); + const char *error= 0; + Log_event *res= 0; + DBUG_ENTER("wsrep_read_log_event"); + + if (data_len > WSREP_MAX_ALLOWED_PACKET) + { + error = "Event too big"; + goto err; + } + + res= Log_event::read_log_event(buf, data_len, &error, description_event, false); + +err: + if (!res) + { + DBUG_ASSERT(error != 0); + sql_print_error("Error in Log_event::read_log_event(): " + "'%s', data_len: %d, event_type: %d", + error,data_len,head[EVENT_TYPE_OFFSET]); + } + (*arg_buf)+= data_len; + (*arg_buf_len)-= data_len; + DBUG_RETURN(res); +} #endif |