summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc239
1 files changed, 147 insertions, 92 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 77cf719fde5..fc8b1904496 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -90,6 +90,28 @@ static const char *HA_ERR(int i)
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
}
+ return 0;
+}
+
+/**
+ macro to call from different branches of Rows_log_event::do_apply_event
+*/
+static void inline slave_rows_error_report(enum loglevel level, int ha_error,
+ Relay_log_info const *rli, THD *thd,
+ TABLE *table, const char * type,
+ const char *log_name, ulong pos)
+{
+ const char *handler_error= HA_ERR(ha_error);
+ rli->report(level, thd->net.last_errno,
+ "Could not execute %s event on table %s.%s;"
+ "%s%s handler error %s; "
+ "the event's master log %s, end_log_pos %lu",
+ type, table->s->db.str,
+ table->s->table_name.str,
+ thd->net.last_error[0] != 0 ? thd->net.last_error : "",
+ thd->net.last_error[0] != 0 ? ";" : "",
+ handler_error == NULL? "<unknown>" : handler_error,
+ log_name, pos);
}
#endif
@@ -6060,7 +6082,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
-
/*
If m_table_id == ~0UL, then we have a dummy event that does not
contain any data. In that case, we just remove all tables in the
@@ -6106,6 +6127,24 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
lex_start(thd);
+ /*
+ There are a few flags that are replicated with each row event.
+ Make sure to set/clear them before executing the main body of
+ the event.
+ */
+ if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
+ thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
+ else
+ thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+
+ if (get_flags(RELAXED_UNIQUE_CHECKS_F))
+ thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
+ else
+ thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+ /* A small test to verify that objects have consistent types */
+ DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+
+
while ((error= lock_tables(thd, rli->tables_to_lock,
rli->tables_to_lock_count, &need_reopen)))
{
@@ -6240,22 +6279,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
So we call set_time(), like in SBR. Presently it changes nothing.
*/
thd->set_time((time_t)when);
- /*
- There are a few flags that are replicated with each row event.
- Make sure to set/clear them before executing the main body of
- the event.
- */
- if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
- thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
- else
- thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
-
- if (get_flags(RELAXED_UNIQUE_CHECKS_F))
- thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
- else
- thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
- /* A small test to verify that objects have consistent types */
- DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
/*
Now we are in a statement and will stay in a statement until we
@@ -6288,8 +6311,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
if (!get_flags(COMPLETE_ROWS_F))
bitmap_intersect(table->write_set,&m_cols);
+ this->slave_exec_mode= slave_exec_mode_options; // fix the mode
+
// Do event specific preparations
-
error= do_before_row_operations(rli);
// row processing loop
@@ -6308,22 +6332,41 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
case 0:
break;
-
- /* Some recoverable errors */
+ /*
+ The following list of "idempotent" errors
+ means that an error from the list might happen
+ because of idempotent (more than once)
+ applying of a binlog file.
+ Notice, that binlog has a ddl operation its
+ second applying may cause
+
+ case HA_ERR_TABLE_DEF_CHANGED:
+ case HA_ERR_CANNOT_ADD_FOREIGN:
+
+ which are not included into to the list.
+ */
case HA_ERR_RECORD_CHANGED:
case HA_ERR_RECORD_DELETED:
case HA_ERR_KEY_NOT_FOUND:
case HA_ERR_END_OF_FILE:
- /* Idempotency support: OK if tuple does not exist */
+ case HA_ERR_FOUND_DUPP_KEY:
+ case HA_ERR_FOUND_DUPP_UNIQUE:
+ case HA_ERR_FOREIGN_DUPLICATE_KEY:
+ case HA_ERR_NO_REFERENCED_ROW:
+ case HA_ERR_ROW_IS_REFERENCED:
+
DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
- error= 0;
+ if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
+ {
+ if (global_system_variables.log_warnings)
+ slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
+ get_type_str(),
+ RPL_LOG_NAME, (ulong) log_pos);
+ error= 0;
+ }
break;
-
+
default:
- rli->report(ERROR_LEVEL, thd->net.last_errno,
- "Error in %s event: row application failed. %s",
- get_type_str(),
- thd->net.last_error ? thd->net.last_error : "");
thd->query_error= 1;
break;
}
@@ -6367,16 +6410,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
if (rli->tables_to_lock && get_flags(STMT_END_F))
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
-
+
if (error)
{ /* error has occured during the transaction */
- rli->report(ERROR_LEVEL, thd->net.last_errno,
- "Error in %s event: error during transaction execution "
- "on table %s.%s. %s",
- get_type_str(), table->s->db.str,
- table->s->table_name.str,
- thd->net.last_error ? thd->net.last_error : "");
-
+ slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
+ get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
+ }
+ if (error)
+ {
/*
If one day we honour --skip-slave-errors in row-based replication, and
the error should be skipped, then we would clear mappings, rollback,
@@ -6488,6 +6529,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
*/
thd->reset_current_stmt_binlog_row_based();
+
rli->cleanup_context(thd, 0);
if (error == 0)
{
@@ -7170,43 +7212,50 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
{
int error= 0;
- /*
- We are using REPLACE semantics and not INSERT IGNORE semantics
- when writing rows, that is: new rows replace old rows. We need to
- inform the storage engine that it should use this behaviour.
+ /**
+ todo: to introduce a property for the event (handler?) which forces
+ applying the event in the replace (idempotent) fashion.
*/
+ if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
+ m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ {
+ /*
+ We are using REPLACE semantics and not INSERT IGNORE semantics
+ when writing rows, that is: new rows replace old rows. We need to
+ inform the storage engine that it should use this behaviour.
+ */
+
+ /* Tell the storage engine that we are using REPLACE semantics. */
+ thd->lex->duplicates= DUP_REPLACE;
+
+ /*
+ Pretend we're executing a REPLACE command: this is needed for
+ InnoDB and NDB Cluster since they are not (properly) checking the
+ lex->duplicates flag.
+ */
+ thd->lex->sql_command= SQLCOM_REPLACE;
+ /*
+ Do not raise the error flag in case of hitting to an unique attribute
+ */
+ m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+ /*
+ NDB specific: update from ndb master wrapped as Write_rows
+ so that the event should be applied to replace slave's row
+ */
+ m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
+ /*
+ NDB specific: if update from ndb master wrapped as Write_rows
+ does not find the row it's assumed idempotent binlog applying
+ is taking place; don't raise the error.
+ */
+ m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
+ /*
+ TODO: the cluster team (Tomas?) says that it's better if the engine knows
+ how many rows are going to be inserted, then it can allocate needed memory
+ from the start.
+ */
+ }
- /* Tell the storage engine that we are using REPLACE semantics. */
- thd->lex->duplicates= DUP_REPLACE;
-
- /*
- Pretend we're executing a REPLACE command: this is needed for
- InnoDB and NDB Cluster since they are not (properly) checking the
- lex->duplicates flag.
- */
- thd->lex->sql_command= SQLCOM_REPLACE;
- /*
- Do not raise the error flag in case of hitting to an unique attribute
- */
- m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
- /*
- NDB specific: update from ndb master wrapped as Write_rows
- */
- /*
- so that the event should be applied to replace slave's row
- */
- m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
- /*
- NDB specific: if update from ndb master wrapped as Write_rows
- does not find the row it's assumed idempotent binlog applying
- is taking place; don't raise the error.
- */
- m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
- /*
- TODO: the cluster team (Tomas?) says that it's better if the engine knows
- how many rows are going to be inserted, then it can allocate needed memory
- from the start.
- */
m_table->file->ha_start_bulk_insert(0);
/*
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
@@ -7228,18 +7277,23 @@ Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
}
int
-Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
int error)
{
int local_error= 0;
- m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
- m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
- /*
- reseting the extra with
- table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
- fires bug#27077
- todo: explain or fix
- */
+ if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1 ||
+ m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
+ {
+ m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+ m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
+ /*
+ resetting the extra with
+ table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
+ fires bug#27077
+ explanation: file->reset() performs this duty
+ ultimately. Still todo: fix
+ */
+ }
if ((local_error= m_table->file->ha_end_bulk_insert()))
{
m_table->file->print_error(local_error, MYF(0));
@@ -7358,23 +7412,22 @@ Rows_log_event::write_row(const Relay_log_info *const rli,
while ((error= table->file->ha_write_row(table->record[0])))
{
- if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
+ if (error == HA_ERR_LOCK_DEADLOCK ||
+ error == HA_ERR_LOCK_WAIT_TIMEOUT ||
+ (keynum= table->file->get_dup_key(error)) < 0 ||
+ !overwrite)
{
- table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */
- DBUG_RETURN(error);
- }
- if ((keynum= table->file->get_dup_key(error)) < 0)
- {
- DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
- table->file->print_error(error, MYF(0));
+ DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
/*
- We failed to retrieve the duplicate key
+ Deadlock, waiting for lock or just an error from the handler
+ such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
+ Retrieval of the duplicate key number may fail
- either because the error was not "duplicate key" error
- or because the information which key is not available
*/
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
-
/*
We need to retrieve the old row into record[1] to be able to
either update or delete the offending record. We either:
@@ -7512,11 +7565,13 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
- int error= write_row(rli, TRUE /* overwrite */);
-
+ int error=
+ write_row(rli, /* if 1 then overwrite */
+ bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
+
if (error && !thd->net.last_errno)
thd->net.last_errno= error;
-
+
return error;
}