summaryrefslogtreecommitdiff
path: root/sql/log_event_old.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-11-01 12:00:11 +0100
committerunknown <knielsen@knielsen-hq.org>2013-11-01 12:00:11 +0100
commit57a267a8c00471bbe13724e7d9ba89d23acef3c2 (patch)
tree808a9f9b165bfe034304e4974dd618eaafcdedb2 /sql/log_event_old.cc
parentbd3dc54261f10f387a03ad99ce74c3824c42e462 (diff)
parentcb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (diff)
downloadmariadb-git-57a267a8c00471bbe13724e7d9ba89d23acef3c2.tar.gz
Merge from 10.0-base to 10.0 the feature MDEV-4506: Parallel replication.
The merge is still missing a few hunks related to temporary tables and InnoDB log file size. The associated code did not seem to exist in 10.0, so the merge of that needs more work. Until this is fixed, there are a number of test failures as a result.
Diffstat (limited to 'sql/log_event_old.cc')
-rw-r--r--sql/log_event_old.cc144
1 files changed, 62 insertions, 82 deletions
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index a050463cbf0..0de331e96c5 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -36,12 +36,13 @@
// Old implementation of do_apply_event()
int
-Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli)
+Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
int error= 0;
THD *ev_thd= ev->thd;
uchar const *row_start= ev->m_rows_buf;
+ const Relay_log_info *rli= rgi->rli;
/*
If m_table_id == ~0UL, then we have a dummy event that does not
@@ -57,7 +58,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
*/
DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
+ rgi->slave_close_thread_tables(ev_thd);
ev_thd->clear_error();
DBUG_RETURN(0);
}
@@ -67,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == ev_thd);
+ DBUG_ASSERT(rgi->thd == ev_thd);
/*
If there is no locks taken, this is the first binrow event seen
@@ -98,7 +99,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
*/
ev_thd->lex->set_stmt_row_injection();
- if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0))
+ if (open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= ev_thd->get_stmt_da()->sql_errno();
if (ev_thd->is_slave_error || ev_thd->is_fatal_error)
@@ -113,7 +114,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
"unexpected success or fatal error"));
ev_thd->is_slave_error= 1;
}
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
DBUG_RETURN(actual_error);
}
@@ -126,8 +127,8 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
*/
{
- RPL_TABLE_LIST *ptr= rli->tables_to_lock;
- for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
+ RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
+ for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
DBUG_ASSERT(ptr->m_tabledef_valid);
@@ -136,7 +137,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
ptr->table, &conv_table))
{
ev_thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
+ rgi->slave_close_thread_tables(ev_thd);
DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
}
DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
@@ -161,15 +162,15 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
Old_rows_log_event, we can invalidate the query cache for the
associated table.
*/
- TABLE_LIST *ptr= rli->tables_to_lock;
- for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ TABLE_LIST *ptr= rgi->tables_to_lock;
+ for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
+ rgi->m_table_map.set_table(ptr->table_id, ptr->table);
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
+ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
- TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
+ TABLE* table= rgi->m_table_map.get_table(ev->m_table_id);
if (table)
{
@@ -205,22 +206,11 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- /*
- Now we are in a statement and will stay in a statement until we
- see a STMT_END_F.
-
- We set this flag here, before actually applying any rows, in
- case the SQL thread is stopped and we need to detect that we're
- inside a statement and halting abruptly might cause problems
- when restarting.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
error= do_before_row_operations(table);
while (error == 0 && row_start < ev->m_rows_end)
{
uchar const *row_end= NULL;
- if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
+ if ((error= do_prepare_row(ev_thd, rgi, table, row_start, &row_end)))
break; // We should perform the after-row operation even in
// the case of error
@@ -280,7 +270,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info
rollback at the caller along with sbr.
*/
ev_thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
+ rgi->cleanup_context(ev_thd, error);
ev_thd->is_slave_error= 1;
DBUG_RETURN(error);
}
@@ -953,7 +943,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int
Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
- Relay_log_info const *rli,
+ rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
@@ -962,7 +952,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
DBUG_ASSERT(row_start && row_end);
int error;
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
@@ -1037,7 +1027,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int
Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
- Relay_log_info const *rli,
+ rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
@@ -1050,7 +1040,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
*/
DBUG_ASSERT(table->s->fields >= m_width);
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
@@ -1134,7 +1124,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
- Relay_log_info const *rli,
+ rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
@@ -1148,14 +1138,14 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
DBUG_ASSERT(table->s->fields >= m_width);
/* record[0] is the before image for the update */
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
row_start = *row_end;
/* m_after_image is the after image for the update */
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, m_after_image,
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
@@ -1451,10 +1441,11 @@ int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
+int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
+ Relay_log_info const *rli= rgi->rli;
/*
If m_table_id == ~0UL, then we have a dummy event that does not
@@ -1470,7 +1461,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
thd->clear_error();
DBUG_RETURN(0);
}
@@ -1480,7 +1471,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rgi->thd == thd);
/*
If there is no locks taken, this is the first binrow event seen
@@ -1498,8 +1489,8 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
lex_start(thd);
- if ((error= lock_tables(thd, rli->tables_to_lock,
- rli->tables_to_lock_count, 0)))
+ if ((error= lock_tables(thd, rgi->tables_to_lock,
+ rgi->tables_to_lock_count, 0)))
{
if (thd->is_slave_error || thd->is_fatal_error)
{
@@ -1521,7 +1512,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
"Error in %s event: when locking tables",
get_type_str());
}
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
DBUG_RETURN(error);
}
@@ -1534,8 +1525,8 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
{
- RPL_TABLE_LIST *ptr= rli->tables_to_lock;
- for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
+ RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
+ for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
TABLE *conv_table;
@@ -1543,7 +1534,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
ptr->table, &conv_table))
{
thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
ptr->m_conv_table= conv_table;
@@ -1565,18 +1556,18 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
Old_rows_log_event, we can invalidate the query cache for the
associated table.
*/
- for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global)
{
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ rgi->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
+ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
TABLE*
table=
- m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
+ m_table= rgi->m_table_map.get_table(m_table_id);
if (table)
{
@@ -1612,17 +1603,6 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- /*
- Now we are in a statement and will stay in a statement until we
- see a STMT_END_F.
-
- We set this flag here, before actually applying any rows, in
- case the SQL thread is stopped and we need to detect that we're
- inside a statement and halting abruptly might cause problems
- when restarting.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
set_flags(COMPLETE_ROWS_F);
@@ -1656,7 +1636,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
if (!table->in_use)
table->in_use= thd;
- error= do_exec_row(rli);
+ error= do_exec_row(rgi);
DBUG_PRINT("info", ("error: %d", error));
DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
@@ -1695,7 +1675,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
(ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
if (!m_curr_row_end && !error)
- unpack_current_row(rli);
+ unpack_current_row(rgi);
// at this moment m_curr_row_end should be set
DBUG_ASSERT(error || m_curr_row_end != NULL);
@@ -1732,7 +1712,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
rollback at the caller along with sbr.
*/
thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
+ rgi->cleanup_context(thd, error);
thd->is_slave_error= 1;
DBUG_RETURN(error);
}
@@ -1761,7 +1741,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
problem. When WL#2975 is implemented, just remove the member
Relay_log_info::last_event_start_time and all its occurrences.
*/
- const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
+ rgi->last_event_start_time= my_time(0);
}
if (get_flags(STMT_END_F))
@@ -1811,7 +1791,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
*/
thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
+ rgi->cleanup_context(thd, 0);
}
DBUG_RETURN(error);
@@ -1819,22 +1799,23 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
Log_event::enum_skip_reason
-Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
+Old_rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
- if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
+ if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
- return Log_event::do_shall_skip(rli);
+ return Log_event::do_shall_skip(rgi);
}
int
-Old_rows_log_event::do_update_pos(Relay_log_info *rli)
+Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
{
+ Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Old_rows_log_event::do_update_pos");
int error= 0;
@@ -1848,7 +1829,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
- rli->stmt_done(log_pos, when, thd);
+ rli->stmt_done(log_pos, when, thd, rgi);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
@@ -1859,7 +1840,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
}
else
{
- rli->inc_event_relay_log_pos();
+ rgi->inc_event_relay_log_pos();
}
DBUG_RETURN(error);
@@ -1996,8 +1977,7 @@ void Old_rows_log_event::print_helper(FILE *file,
*/
int
-Old_rows_log_event::write_row(const Relay_log_info *const rli,
- const bool overwrite)
+Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
{
DBUG_ENTER("write_row");
DBUG_ASSERT(m_table != NULL && thd != NULL);
@@ -2014,7 +1994,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
DBUG_RETURN(error);
/* unpack row into table->record[0] */
- error= unpack_current_row(rli); // TODO: how to handle errors?
+ error= unpack_current_row(rgi); // TODO: how to handle errors?
#ifndef DBUG_OFF
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
@@ -2121,7 +2101,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
if (!get_flags(COMPLETE_ROWS_F))
{
restore_record(table,record[1]);
- error= unpack_current_row(rli);
+ error= unpack_current_row(rgi);
}
#ifndef DBUG_OFF
@@ -2216,7 +2196,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
for any following update/delete command.
*/
-int Old_rows_log_event::find_row(const Relay_log_info *rli)
+int Old_rows_log_event::find_row(rpl_group_info *rgi)
{
DBUG_ENTER("find_row");
@@ -2229,7 +2209,7 @@ int Old_rows_log_event::find_row(const Relay_log_info *rli)
// TODO: shall we check and report errors here?
prepare_record(table, m_width, FALSE /* don't check errors */);
- error= unpack_current_row(rli);
+ error= unpack_current_row(rgi);
#ifndef DBUG_OFF
DBUG_PRINT("info",("looking for the following record"));
@@ -2601,10 +2581,10 @@ Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabili
int
-Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
+Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
- int error= write_row(rli, TRUE /* overwrite */);
+ int error= write_row(rgi, TRUE /* overwrite */);
if (error && !thd->net.last_errno)
thd->net.last_errno= error;
@@ -2703,12 +2683,12 @@ Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil
}
-int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
+int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
int error;
DBUG_ASSERT(m_table != NULL);
- if (!(error= find_row(rli)))
+ if (!(error= find_row(rgi)))
{
/*
Delete the record found, located in record[0]
@@ -2802,11 +2782,11 @@ Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil
int
-Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
+Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
- int error= find_row(rli);
+ int error= find_row(rgi);
if (error)
{
/*
@@ -2814,7 +2794,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
able to skip to the next pair of updates
*/
m_curr_row= m_curr_row_end;
- unpack_current_row(rli);
+ unpack_current_row(rgi);
return error;
}
@@ -2832,7 +2812,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
- error= unpack_current_row(rli); // this also updates m_curr_row_end
+ error= unpack_current_row(rgi); // this also updates m_curr_row_end
/*
Now we have the right row to update. The old row (the one we're