summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc164
1 files changed, 161 insertions, 3 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 27dddb69144..c19faf21647 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -44,7 +44,7 @@
#include <mysql/psi/mysql_statement.h>
#include <strfunc.h>
#include "compat56.h"
-
+#include "wsrep_mysqld.h"
#endif /* MYSQL_CLIENT */
#include <base64.h>
@@ -3092,6 +3092,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
{
time_t end_time;
+#ifdef WITH_WSREP
+ /*
+ If Query_log_event will contain non trans keyword (not BEGIN, COMMIT,
+ SAVEPOINT or ROLLBACK) we disable PA for this transaction.
+ */
+ if (WSREP_ON && !is_trans_keyword())
+ thd->wsrep_PA_safe= false;
+#endif /* WITH_WSREP */
+
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
@@ -4508,6 +4517,22 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
+#ifdef WITH_WSREP
+ else if (WSREP_ON && wsrep_mysql_replication_bundle && opt_slave_domain_parallel_threads == 0 &&
+ thd->wsrep_mysql_replicated > 0 &&
+ (is_begin() || is_commit()))
+ {
+ if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
+ {
+ WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
+ DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
+ }
+ else
+ {
+ thd->wsrep_mysql_replicated = 0;
+ }
+ }
+#endif
DBUG_RETURN(Log_event::do_shall_skip(rgi));
}
@@ -7337,6 +7362,21 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi)
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN);
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
+#ifdef WITH_WSREP
+ else if (wsrep_mysql_replication_bundle && WSREP_ON &&
+ opt_slave_domain_parallel_threads == 0)
+ {
+ if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
+ {
+ WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
+ DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
+ }
+ else
+ {
+ thd->wsrep_mysql_replicated = 0;
+ }
+ }
+#endif
DBUG_RETURN(Log_event::do_shall_skip(rgi));
}
#endif /* !MYSQL_CLIENT */
@@ -9614,6 +9654,18 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->get_stmt_da()->sql_errno();
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ WSREP_WARN("BF applier failed to open_and_lock_tables: %u, fatal: %d "
+ "wsrep = (exec_mode: %d conflict_state: %d seqno: %lld)",
+ thd->get_stmt_da()->sql_errno(),
+ thd->is_fatal_error,
+ thd->wsrep_exec_mode,
+ thd->wsrep_conflict_state,
+ (long long)wsrep_thd_trx_seqno(thd));
+ }
+#endif
if (thd->is_slave_error || thd->is_fatal_error)
{
/*
@@ -10760,8 +10812,8 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list)
DBUG_ENTER("check_table_map");
enum_tbl_map_status res= OK_TO_PROCESS;
Relay_log_info *rli= rgi->rli;
-
- if (rgi->thd->slave_thread /* filtering is for slave only */ &&
+ if ((rgi->thd->slave_thread /* filtering is for slave only */ ||
+ IF_WSREP((WSREP(rgi->thd) && rgi->thd->wsrep_applier), 0)) &&
(!rli->mi->rpl_filter->db_ok(table_list->db) ||
(rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
@@ -11509,7 +11561,19 @@ int
Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
+ const char *tmp= thd->get_proc_info();
+ const char *message= "Write_rows_log_event::write_row()";
+
+#ifdef WSREP_PROC_INFO
+ my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "Write_rows_log_event::write_row(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ message= thd->wsrep_info;
+#endif /* WSREP_PROC_INFO */
+
+ thd_proc_info(thd, message);
int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+ thd_proc_info(thd, tmp);
if (error && !thd->is_error())
{
@@ -12187,15 +12251,34 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability
int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
int error;
+ const char *tmp= thd->get_proc_info();
+ const char *message= "Delete_rows_log_event::find_row()";
const bool invoke_triggers=
slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
DBUG_ASSERT(m_table != NULL);
+#ifdef WSREP_PROC_INFO
+ my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "Delete_rows_log_event::find_row(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ message= thd->wsrep_info;
+#endif /* WSREP_PROC_INFO */
+
+ thd_proc_info(thd, message);
if (!(error= find_row(rgi)))
{
/*
Delete the record found, located in record[0]
*/
+ message= "Delete_rows_log_event::ha_delete_row()";
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "Delete_rows_log_event::ha_delete_row(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ message= thd->wsrep_info;
+#endif
+ thd_proc_info(thd, message);
+
if (invoke_triggers &&
process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE))
error= HA_ERR_GENERIC; // in case if error is not set yet
@@ -12206,6 +12289,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
error= HA_ERR_GENERIC; // in case if error is not set yet
m_table->file->ha_index_or_rnd_end();
}
+ thd_proc_info(thd, tmp);
return error;
}
@@ -12333,8 +12417,18 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
const bool invoke_triggers=
slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
+ const char *tmp= thd->get_proc_info();
+ const char *message= "Update_rows_log_event::find_row()";
DBUG_ASSERT(m_table != NULL);
+#ifdef WSREP_PROC_INFO
+ my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "Update_rows_log_event::find_row(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ message= thd->wsrep_info;
+#endif /* WSREP_PROC_INFO */
+
+ thd_proc_info(thd, message);
int error= find_row(rgi);
if (error)
{
@@ -12344,6 +12438,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
*/
m_curr_row= m_curr_row_end;
unpack_current_row(rgi);
+ thd_proc_info(thd, tmp);
return error;
}
@@ -12361,7 +12456,16 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
+ message= "Update_rows_log_event::unpack_current_row()";
+#ifdef WSREP_PROC_INFO
+ my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "Update_rows_log_event::unpack_current_row(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ message= thd->wsrep_info;
+#endif /* WSREP_PROC_INFO */
+
/* this also updates m_curr_row_end */
+ thd_proc_info(thd, message);
if ((error= unpack_current_row(rgi)))
goto err;
@@ -12379,6 +12483,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif
+ message= "Update_rows_log_event::ha_update_row()";
+#ifdef WSREP_PROC_INFO
+ my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "Update_rows_log_event::ha_update_row(%lld)",
+ (long long) wsrep_thd_trx_seqno(thd));
+ message= thd->wsrep_info;
+#endif /* WSREP_PROC_INFO */
+
+ thd_proc_info(thd, message);
if (invoke_triggers &&
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE))
{
@@ -12394,6 +12507,8 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE))
error= HA_ERR_GENERIC; // in case if error is not set yet
+ thd_proc_info(thd, tmp);
+
err:
m_table->file->ha_index_or_rnd_end();
return error;
@@ -12483,6 +12598,49 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol)
m_incident, description(), m_message.str);
protocol->store(buf, bytes, &my_charset_bin);
}
+#endif /* MYSQL_CLIENT */
+
+
+#if WITH_WSREP && !defined(MYSQL_CLIENT)
+Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
+/*
+ read the first event from (*buf). The size of the (*buf) is (*buf_len).
+ At the end (*buf) is shitfed to point to the following event or NULL and
+ (*buf_len) will be changed to account just being read bytes of the 1st event.
+*/
+#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max
+
+Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event)
+{
+ char *head= (*arg_buf);
+ uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
+ char *buf= (*arg_buf);
+ const char *error= 0;
+ Log_event *res= 0;
+ DBUG_ENTER("wsrep_read_log_event");
+
+ if (data_len > WSREP_MAX_ALLOWED_PACKET)
+ {
+ error = "Event too big";
+ goto err;
+ }
+
+ res= Log_event::read_log_event(buf, data_len, &error, description_event, false);
+
+err:
+ if (!res)
+ {
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
+ error,data_len,head[EVENT_TYPE_OFFSET]);
+ }
+ (*arg_buf)+= data_len;
+ (*arg_buf_len)-= data_len;
+ DBUG_RETURN(res);
+}
#endif