diff options
Diffstat (limited to 'sql/log_event_old.cc')
-rw-r--r-- | sql/log_event_old.cc | 147 |
1 files changed, 64 insertions, 83 deletions
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 6623d7655d7..211768150af 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -36,12 +36,13 @@ // Old implementation of do_apply_event() int -Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli) +Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) { DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)"); int error= 0; THD *ev_thd= ev->thd; uchar const *row_start= ev->m_rows_buf; + const Relay_log_info *rli= rgi->rli; /* If m_table_id == ~0UL, then we have a dummy event that does not @@ -57,7 +58,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info */ DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd); + rgi->slave_close_thread_tables(ev_thd); ev_thd->clear_error(); DBUG_RETURN(0); } @@ -67,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info do_apply_event(). We still check here to prevent future coding errors. */ - DBUG_ASSERT(rli->sql_thd == ev_thd); + DBUG_ASSERT(rgi->thd == ev_thd); /* If there is no locks taken, this is the first binrow event seen @@ -86,8 +87,9 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info call might reset the value of current_stmt_binlog_format, so we need to do any changes to that value after this function. */ + delete_explain_query(thd->lex); lex_start(ev_thd); - mysql_reset_thd_for_next_command(ev_thd, 0); + mysql_reset_thd_for_next_command(ev_thd); /* This is a row injection, so we flag the "statement" as @@ -97,7 +99,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info */ ev_thd->lex->set_stmt_row_injection(); - if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0)) + if (open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0)) { uint actual_error= ev_thd->get_stmt_da()->sql_errno(); if (ev_thd->is_slave_error || ev_thd->is_fatal_error) @@ -112,7 +114,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info "unexpected success or fatal error")); ev_thd->is_slave_error= 1; } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(actual_error); } @@ -125,8 +127,8 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info */ { - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { DBUG_ASSERT(ptr->m_tabledef_valid); @@ -135,7 +137,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info ptr->table, &conv_table)) { ev_thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd); + rgi->slave_close_thread_tables(ev_thd); DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); } DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" @@ -160,15 +162,15 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info Old_rows_log_event, we can invalidate the query cache for the associated table. */ - TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++) - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++) + rgi->m_table_map.set_table(ptr->table_id, ptr->table); #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } - TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id); + TABLE* table= rgi->m_table_map.get_table(ev->m_table_id); if (table) { @@ -204,22 +206,11 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info /* A small test to verify that objects have consistent types */ DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); - /* - Now we are in a statement and will stay in a statement until we - see a STMT_END_F. - - We set this flag here, before actually applying any rows, in - case the SQL thread is stopped and we need to detect that we're - inside a statement and halting abruptly might cause problems - when restarting. - */ - const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); - error= do_before_row_operations(table); while (error == 0 && row_start < ev->m_rows_end) { uchar const *row_end= NULL; - if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end))) + if ((error= do_prepare_row(ev_thd, rgi, table, row_start, &row_end))) break; // We should perform the after-row operation even in // the case of error @@ -279,7 +270,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info rollback at the caller along with sbr. */ ev_thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error); + rgi->cleanup_context(ev_thd, error); ev_thd->is_slave_error= 1; DBUG_RETURN(error); } @@ -952,7 +943,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Write_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -961,7 +952,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg, DBUG_ASSERT(row_start && row_end); int error; - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1036,7 +1027,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -1049,7 +1040,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, */ DBUG_ASSERT(table->s->fields >= m_width); - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1133,7 +1124,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -1147,14 +1138,14 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, DBUG_ASSERT(table->s->fields >= m_width); /* record[0] is the before image for the update */ - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, table->read_set, PRE_GA_UPDATE_ROWS_EVENT); row_start = *row_end; /* m_after_image is the after image for the update */ - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, m_after_image, row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1450,10 +1441,11 @@ int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) +int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) { DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; + Relay_log_info const *rli= rgi->rli; /* If m_table_id == ~0UL, then we have a dummy event that does not @@ -1469,7 +1461,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) */ DBUG_ASSERT(get_flags(STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); thd->clear_error(); DBUG_RETURN(0); } @@ -1479,7 +1471,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) do_apply_event(). We still check here to prevent future coding errors. */ - DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rgi->thd == thd); /* If there is no locks taken, this is the first binrow event seen @@ -1497,8 +1489,8 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) */ lex_start(thd); - if ((error= lock_tables(thd, rli->tables_to_lock, - rli->tables_to_lock_count, 0))) + if ((error= lock_tables(thd, rgi->tables_to_lock, + rgi->tables_to_lock_count, 0))) { if (thd->is_slave_error || thd->is_fatal_error) { @@ -1520,7 +1512,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) "Error in %s event: when locking tables", get_type_str()); } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(error); } @@ -1533,8 +1525,8 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) */ { - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { TABLE *conv_table; @@ -1542,7 +1534,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) ptr->table, &conv_table)) { thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } ptr->m_conv_table= conv_table; @@ -1564,18 +1556,18 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) Old_rows_log_event, we can invalidate the query cache for the associated table. */ - for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) + for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global) { - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + rgi->m_table_map.set_table(ptr->table_id, ptr->table); } #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } TABLE* table= - m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id); + m_table= rgi->m_table_map.get_table(m_table_id); if (table) { @@ -1611,17 +1603,6 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) /* A small test to verify that objects have consistent types */ DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); - /* - Now we are in a statement and will stay in a statement until we - see a STMT_END_F. - - We set this flag here, before actually applying any rows, in - case the SQL thread is stopped and we need to detect that we're - inside a statement and halting abruptly might cause problems - when restarting. - */ - const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); - if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols)) set_flags(COMPLETE_ROWS_F); @@ -1655,7 +1636,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) if (!table->in_use) table->in_use= thd; - error= do_exec_row(rli); + error= do_exec_row(rgi); DBUG_PRINT("info", ("error: %d", error)); DBUG_ASSERT(error != HA_ERR_RECORD_DELETED); @@ -1694,7 +1675,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end)); if (!m_curr_row_end && !error) - unpack_current_row(rli); + unpack_current_row(rgi); // at this moment m_curr_row_end should be set DBUG_ASSERT(error || m_curr_row_end != NULL); @@ -1731,7 +1712,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) rollback at the caller along with sbr. */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); + rgi->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); } @@ -1760,7 +1741,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) problem. When WL#2975 is implemented, just remove the member Relay_log_info::last_event_start_time and all its occurrences. */ - const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); + rgi->last_event_start_time= my_time(0); } if (get_flags(STMT_END_F)) @@ -1810,7 +1791,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); + rgi->cleanup_context(thd, 0); } DBUG_RETURN(error); @@ -1818,22 +1799,23 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) Log_event::enum_skip_reason -Old_rows_log_event::do_shall_skip(Relay_log_info *rli) +Old_rows_log_event::do_shall_skip(rpl_group_info *rgi) { /* If the slave skip counter is 1 and this event does not end a statement, then we should not start executing on the next event. Otherwise, we defer the decision to the normal skipping logic. */ - if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F)) + if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F)) return Log_event::EVENT_SKIP_IGNORE; else - return Log_event::do_shall_skip(rli); + return Log_event::do_shall_skip(rgi); } int -Old_rows_log_event::do_update_pos(Relay_log_info *rli) +Old_rows_log_event::do_update_pos(rpl_group_info *rgi) { + Relay_log_info *rli= rgi->rli; DBUG_ENTER("Old_rows_log_event::do_update_pos"); int error= 0; @@ -1847,7 +1829,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli) Step the group log position if we are not in a transaction, otherwise increase the event log position. */ - rli->stmt_done(log_pos, when, thd); + rli->stmt_done(log_pos, when, thd, rgi); /* Clear any errors in thd->net.last_err*. It is not known if this is needed or not. It is believed that any errors that may exist in @@ -1858,7 +1840,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli) } else { - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); } DBUG_RETURN(error); @@ -1995,8 +1977,7 @@ void Old_rows_log_event::print_helper(FILE *file, */ int -Old_rows_log_event::write_row(const Relay_log_info *const rli, - const bool overwrite) +Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite) { DBUG_ENTER("write_row"); DBUG_ASSERT(m_table != NULL && thd != NULL); @@ -2013,7 +1994,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, DBUG_RETURN(error); /* unpack row into table->record[0] */ - error= unpack_current_row(rli); // TODO: how to handle errors? + error= unpack_current_row(rgi); // TODO: how to handle errors? #ifndef DBUG_OFF DBUG_DUMP("record[0]", table->record[0], table->s->reclength); @@ -2120,7 +2101,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, if (!get_flags(COMPLETE_ROWS_F)) { restore_record(table,record[1]); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); } #ifndef DBUG_OFF @@ -2215,7 +2196,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, for any following update/delete command. */ -int Old_rows_log_event::find_row(const Relay_log_info *rli) +int Old_rows_log_event::find_row(rpl_group_info *rgi) { DBUG_ENTER("find_row"); @@ -2228,7 +2209,7 @@ int Old_rows_log_event::find_row(const Relay_log_info *rli) // TODO: shall we check and report errors here? prepare_record(table, m_width, FALSE /* don't check errors */); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); #ifndef DBUG_OFF DBUG_PRINT("info",("looking for the following record")); @@ -2600,10 +2581,10 @@ Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabili int -Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= write_row(rli, TRUE /* overwrite */); + int error= write_row(rgi, TRUE /* overwrite */); if (error && !thd->net.last_errno) thd->net.last_errno= error; @@ -2702,12 +2683,12 @@ Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil } -int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { int error; DBUG_ASSERT(m_table != NULL); - if (!(error= find_row(rli))) + if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] @@ -2801,11 +2782,11 @@ Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil int -Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= find_row(rli); + int error= find_row(rgi); if (error) { /* @@ -2813,7 +2794,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) able to skip to the next pair of updates */ m_curr_row= m_curr_row_end; - unpack_current_row(rli); + unpack_current_row(rgi); return error; } @@ -2831,7 +2812,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; - error= unpack_current_row(rli); // this also updates m_curr_row_end + error= unpack_current_row(rgi); // this also updates m_curr_row_end /* Now we have the right row to update. The old row (the one we're |