summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc143
1 files changed, 141 insertions, 2 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fce0130e8dd..2968764920e 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -49,6 +49,9 @@
#include "sql_show.h" // append_identifier
#include <strfunc.h>
+#if WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#endif /* MYSQL_CLIENT */
#include <base64.h>
@@ -2873,7 +2876,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
master_data_written(0)
{
time_t end_time;
-
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= false;
+#endif /* WITH_WSREP */
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
@@ -9078,10 +9083,21 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
-
if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->stmt_da->sql_errno();
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ WSREP_WARN("BF applier failed to open_and_lock_tables: %u, fatal: %d "
+ "wsrep = (exec_mode: %d conflict_state: %d seqno: %lld)",
+ thd->stmt_da->sql_errno(),
+ thd->is_fatal_error,
+ thd->wsrep_exec_mode,
+ thd->wsrep_conflict_state,
+ (long long)thd->wsrep_trx_seqno);
+ }
+#endif
if (thd->is_slave_error || thd->is_fatal_error)
{
/*
@@ -10190,7 +10206,12 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
DBUG_ENTER("check_table_map");
enum_tbl_map_status res= OK_TO_PROCESS;
+#ifdef WITH_WSREP
+ if ((rli->sql_thd->slave_thread /* filtering is for slave only */ ||
+ (WSREP(rli->sql_thd) && rli->sql_thd->wsrep_applier)) &&
+#else
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
+#endif /* WITH_WSREP */
(!rli->mi->rpl_filter->db_ok(table_list->db) ||
(rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
@@ -10833,8 +10854,23 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Write_rows_log_event::write_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
if (error && !thd->is_error())
{
DBUG_ASSERT(0);
@@ -11499,14 +11535,39 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
int error;
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Delete_rows_log_event::find_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
if (!(error= find_row(rli)))
{
/*
Delete the record found, located in record[0]
*/
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Delete_rows_log_event::ha_delete_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
error= m_table->file->ha_delete_row(m_table->record[0]);
m_table->file->ha_index_or_rnd_end();
}
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
return error;
}
@@ -11624,6 +11685,18 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Update_rows_log_event::find_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
int error= find_row(rli);
if (error)
{
@@ -11650,6 +11723,17 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Update_rows_log_event::unpack_current_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd))
+ thd_proc_info(thd,"Update_rows_log_event::unpack_current_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
/* this also updates m_curr_row_end */
if ((error= unpack_current_row(rli)))
goto err;
@@ -11668,10 +11752,23 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Update_rows_log_event::ha_update_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
if (error == HA_ERR_RECORD_IS_THE_SAME)
error= 0;
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
err:
m_table->file->ha_index_or_rnd_end();
return error;
@@ -11756,6 +11853,48 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol)
protocol->store(buf, bytes, &my_charset_bin);
}
#endif
+#if WITH_WSREP && !defined(MYSQL_CLIENT)
+Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
+/*
+ read the first event from (*buf). The size of the (*buf) is (*buf_len).
+ At the end (*buf) is shitfed to point to the following event or NULL and
+ (*buf_len) will be changed to account just being read bytes of the 1st event.
+*/
+#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max
+
+Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event)
+{
+ DBUG_ENTER("wsrep_read_log_event");
+ char *head= (*arg_buf);
+
+ uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
+ char *buf= (*arg_buf);
+ const char *error= 0;
+ Log_event *res= 0;
+
+ if (data_len > WSREP_MAX_ALLOWED_PACKET)
+ {
+ error = "Event too big";
+ goto err;
+ }
+
+ res= Log_event::read_log_event(buf, data_len, &error, description_event, FALSE);
+
+err:
+ if (!res)
+ {
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
+ error,data_len,head[EVENT_TYPE_OFFSET]);
+ }
+ (*arg_buf)+= data_len;
+ (*arg_buf_len)-= data_len;
+ DBUG_RETURN(res);
+}
+#endif
#ifdef MYSQL_CLIENT