summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc309
1 files changed, 234 insertions, 75 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 707b8a0d3bf..1f462009c15 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -3834,7 +3834,8 @@ select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par,
sel_result(result),
table_list(table_list_par), table(table_par), fields(fields_par),
autoinc_value_of_last_inserted_row(0),
- insert_into_view(table_list_par && table_list_par->view != 0)
+ insert_into_view(table_list_par && table_list_par->view != 0),
+ binary_logged(false), atomic_replace(false)
{
bzero((char*) &info,sizeof(info));
info.handle_duplicates= duplic;
@@ -4166,7 +4167,6 @@ bool select_insert::prepare_eof()
int error;
bool const trans_table= table->file->has_transactions_and_rollback();
bool changed;
- bool binary_logged= 0;
killed_state killed_status= thd->killed;
DBUG_ENTER("select_insert::prepare_eof");
@@ -4185,8 +4185,8 @@ bool select_insert::prepare_eof()
error= thd->get_stmt_da()->sql_errno();
if (info.ignore || info.handle_duplicates != DUP_ERROR)
- if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
- table->file->ha_rnd_end();
+ if (!atomic_replace && table->file->ha_table_flags() & HA_DUPLICATE_POS)
+ table->file->ha_rnd_end();
table->file->extra(HA_EXTRA_END_ALTER_COPY);
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
@@ -4231,6 +4231,11 @@ bool select_insert::prepare_eof()
table->file->ha_release_auto_increment();
DBUG_RETURN(true);
}
+ /*
+ FIXME: bad check !table->s->tmp_table in case of atomic_replace.
+ The better check is create_info->tmp_table(). The even better is to
+ update binary_logged in do_postlock() for RBR.
+ */
binary_logged= res == 0 || !table->s->tmp_table;
}
table->s->table_creation_was_logged|= binary_logged;
@@ -4349,6 +4354,12 @@ void select_insert::abort_result_set()
res= thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
thd->query_length(),
transactional_table, FALSE, FALSE, errcode);
+
+ /*
+ FIXME: bad check !table->s->tmp_table in case of atomic_replace.
+ The better check is create_info->tmp_table(). The even better is to
+ update binary_logged in do_postlock() for RBR.
+ */
binary_logged= res == 0 || !table->s->tmp_table;
}
if (changed)
@@ -4423,11 +4434,13 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
TABLE tmp_table; // Used during 'Create_field()'
TABLE_SHARE share;
TABLE *table= 0;
- uint select_field_count= items->elements;
+ alter_info->select_field_count= items->elements;
/* Add selected items to field list */
List_iterator_fast<Item> it(*items);
Item *item;
bool save_table_creation_was_logged;
+ int create_table_mode= CREATE_ORDINARY;
+ LEX_CUSTRING frm= {0, 0};
DBUG_ENTER("select_create::create_table_from_items");
tmp_table.s= &share;
@@ -4481,10 +4494,8 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
alter_info->create_list.push_back(cr_field, thd->mem_root);
}
- if (create_info->check_fields(thd, alter_info,
- create_table->table_name,
- create_table->db,
- select_field_count))
+ if (create_info->check_fields(thd, alter_info, create_table->table_name,
+ create_table->db))
DBUG_RETURN(NULL);
DEBUG_SYNC(thd,"create_table_select_before_create");
@@ -4499,6 +4510,20 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
create_info->mdl_ticket= create_table->table->mdl_ticket;
}
+ if (atomic_replace)
+ {
+ DBUG_ASSERT(!create_info->tmp_table()); // FIXME: test
+ if (make_tmp_name(thd, "create", create_table, &new_table))
+ DBUG_RETURN(NULL);
+ create_table_mode|= CREATE_TMP_TABLE;
+ DBUG_ASSERT(!(create_info->options & HA_CREATE_TMP_ALTER));
+ // FIXME: restore options?
+ create_info->options|= HA_CREATE_TMP_ALTER;
+ select_create::create_table= &new_table;
+ select_insert::table_list= &new_table;
+ create_info->tmp_name= &new_table;
+ }
+
/*
Create and lock table.
@@ -4516,11 +4541,14 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
open_table().
*/
- if (!mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm,
+ if (!mysql_create_table_no_lock(thd,
+ &orig_table->db,
+ &orig_table->table_name,
&create_table->db,
&create_table->table_name,
create_info, alter_info, NULL,
- select_field_count, create_table))
+ create_table_mode, create_table,
+ atomic_replace ? &frm : NULL))
{
DEBUG_SYNC(thd,"create_table_select_before_open");
@@ -4530,7 +4558,44 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
*/
create_table->table= 0;
- if (!create_info->tmp_table())
+ if (atomic_replace)
+ {
+ char tmp_path[FN_REFLEN + 1];
+ build_table_filename(tmp_path, sizeof(tmp_path) - 1, new_table.db.str,
+ new_table.table_name.str, "", FN_IS_TMP);
+
+ create_table->table=
+ thd->create_and_open_tmp_table(&frm, tmp_path, orig_table->db.str,
+ orig_table->table_name.str, true);
+ /* NOTE: if create_and_open_tmp_table() fails the table is dropped by
+ * ddl_log_state_create */
+ if (create_table->table)
+ {
+ /*
+ NOTE: Aria tables require table locking to work in transactional
+ mode. Since we don't lock our temporary table we get problems with
+ unproperly initialized transactional mode: seg-fault while accessing
+ uninitialized trn member (reproduced by
+ atomic.create_replace,aria,stmt).
+
+ This hack disables logging for Aria table (that is not needed anyway
+ for a temporary table).
+ */
+ bool on_save= thd->transaction->on;
+ thd->transaction->on= false;
+ if (create_table->table->file->ha_external_lock(thd, F_WRLCK))
+ {
+ // FIXME: test
+ create_table->table= 0;
+ thd->transaction->on= on_save;
+ goto err;
+ }
+
+ thd->transaction->on= on_save;
+ create_table->table->s->can_do_row_logging= 1;
+ }
+ }
+ else if (!create_info->tmp_table())
{
Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
TABLE_LIST::enum_open_strategy save_open_strategy;
@@ -4571,13 +4636,20 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
}
else
create_table->table= 0; // Create failed
-
+
+err:
+ DBUG_ASSERT(!create_table->table || frm.str || !atomic_replace);
+ my_free(const_cast<uchar *>(frm.str));
+
if (unlikely(!(table= create_table->table)))
{
if (likely(!thd->is_error())) // CREATE ... IF NOT EXISTS
my_ok(thd); // succeed, but did nothing
+ if (ddl_log_state_rm.is_active())
+ (void) ddl_log_revert(thd, &ddl_log_state_create);
+ else
+ ddl_log_complete(&ddl_log_state_create);
ddl_log_complete(&ddl_log_state_rm);
- ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(NULL);
}
@@ -4598,8 +4670,9 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
since it won't wait for the table lock (we have exclusive metadata lock on
the table) and thus can't get aborted.
*/
- if (unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
- hooks->postlock(&table, 1)))
+ if ((!atomic_replace &&
+ unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)))) ||
+ hooks->postlock(&table, 1))
{
/* purecov: begin tested */
/*
@@ -4616,13 +4689,20 @@ TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
*lock= 0;
}
drop_open_table(thd, table, &create_table->db, &create_table->table_name);
+ if (ddl_log_state_rm.is_active())
+ (void) ddl_log_revert(thd, &ddl_log_state_create);
+ else
+ ddl_log_complete(&ddl_log_state_create);
ddl_log_complete(&ddl_log_state_rm);
- ddl_log_complete(&ddl_log_state_create);
DBUG_RETURN(NULL);
/* purecov: end */
}
+ DBUG_ASSERT(
+ create_info->tmp_table() ||
+ thd->mdl_context.is_lock_owner(MDL_key::TABLE, orig_table->db.str,
+ orig_table->table_name.str, MDL_SHARED));
table->s->table_creation_was_logged= save_table_creation_was_logged;
- if (!table->s->tmp_table)
+ if (!create_info->tmp_table())
table->file->prepare_for_row_logging();
/*
@@ -4677,6 +4757,11 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
private:
virtual int do_postlock(TABLE **tables, uint count)
{
+ /*
+ NOTE: for row format CREATE TABLE must be logged before row data.
+
+ TODO: Remove creepy TABLEOP_HOOKS interface?
+ */
int error;
THD *thd= const_cast<THD*>(ptr->get_thd());
TABLE_LIST *save_next_global= create_table->next_global;
@@ -4690,10 +4775,23 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
if (unlikely(error))
return error;
- TABLE const *const table = *tables;
if (thd->is_current_stmt_binlog_format_row() &&
- !table->s->tmp_table)
- return binlog_show_create_table(thd, *tables, ptr->create_info);
+ !ptr->create_info->tmp_table())
+ {
+ thd->binlog_xid= thd->query_id;
+ /*
+ Remember xid's for the case of row based logging. Note that binary
+ log is not flushed until the end of statement, so it is OK to write
+ it now and if crash happens until we closed ddl_log_state_rm we
+ won't see CREATE OR REPLACE event in the binary log.
+ */
+ ddl_log_update_xid(&ptr->ddl_log_state_create, thd->binlog_xid);
+ if (ptr->ddl_log_state_rm.is_active() &&
+ !ptr->create_info->is_atomic_replace())
+ ddl_log_update_xid(&ptr->ddl_log_state_rm, thd->binlog_xid);
+ error= binlog_show_create_table(thd, *tables, ptr->create_info);
+ return error;
+ }
return 0;
}
select_create *ptr;
@@ -4720,8 +4818,9 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr)))
{
- if (create_info->or_replace())
+ if (create_info->or_replace() && !atomic_replace)
{
+ /* TODO: why create_info->table_was_deleted not used? */
/* Original table was deleted. We have to log it */
log_drop_table(thd, &create_table->db, &create_table->table_name,
&create_info->org_storage_engine_name,
@@ -4734,6 +4833,8 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
DBUG_RETURN(-1);
}
+ DBUG_ASSERT(table == create_table->table);
+
if (create_info->tmp_table())
{
/*
@@ -4743,14 +4844,14 @@ select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
list to keep them inaccessible from inner statements.
e.g. CREATE TEMPORARY TABLE `t1` AS SELECT * FROM `t1`;
*/
- saved_tmp_table_share= thd->save_tmp_table_share(create_table->table);
+ saved_tmp_table_share= thd->save_tmp_table_share(table);
}
if (extra_lock)
{
DBUG_ASSERT(m_plock == NULL);
- if (create_info->tmp_table())
+ if (table->s->tmp_table)
m_plock= &m_lock;
else
m_plock= &thd->extra_lock;
@@ -4845,6 +4946,10 @@ static int binlog_show_create_table(THD *thd, TABLE *table,
create_info, WITH_DB_NAME);
DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
+ /*
+ TODO (optimization): why it does show_create_table() even if
+ !mysql_bin_log.is_open()?
+ */
if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
@@ -4966,28 +5071,42 @@ bool select_create::send_eof()
is in select_insert::prepare_eof(). For that reason, we
mark the flag at this point.
*/
- if (table->s->tmp_table)
+ if (create_info->tmp_table())
thd->transaction->stmt.mark_created_temp_table();
if (thd->slave_thread)
thd->variables.binlog_annotate_row_events= 0;
+ if (atomic_replace)
+ {
+ DBUG_ASSERT(table->s->tmp_table);
+
+ int result;
+ if (table->file->ha_index_or_rnd_end() ||
+ table->file->ha_external_lock(thd, F_UNLCK))
+ {
+ abort_result_set();
+ DBUG_RETURN(true);
+ }
+
+ create_info->table= orig_table->table;
+ if (create_table_handle_exists(thd, orig_table->db, orig_table->table_name,
+ *create_info, create_info, result))
+ {
+ abort_result_set();
+ DBUG_RETURN(true);
+ }
+ }
+
debug_crash_here("ddl_log_create_before_binlog");
- /*
- In case of crash, we have to add DROP TABLE to the binary log as
- the CREATE TABLE will already be logged if we are not using row based
- replication.
- */
- if (!thd->is_current_stmt_binlog_format_row())
+ if (!thd->binlog_xid && !create_info->tmp_table())
{
- if (ddl_log_state_create.is_active()) // Not temporary table
- ddl_log_update_phase(&ddl_log_state_create, DDL_CREATE_TABLE_PHASE_LOG);
- /*
- We can ignore if we replaced an old table as ddl_log_state_create will
- now handle the logging of the drop if needed.
- */
- ddl_log_complete(&ddl_log_state_rm);
+ thd->binlog_xid= thd->query_id;
+ /* Remember xid's for the case of row based logging */
+ ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
+ if (ddl_log_state_rm.is_active() && !atomic_replace)
+ ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
}
if (prepare_eof())
@@ -4997,7 +5116,7 @@ bool select_create::send_eof()
}
debug_crash_here("ddl_log_create_after_prepare_eof");
- if (table->s->tmp_table)
+ if (create_info->tmp_table())
{
/*
Now is good time to add the new table to THD temporary tables list.
@@ -5017,17 +5136,25 @@ bool select_create::send_eof()
thd->restore_tmp_table_share(saved_tmp_table_share);
}
}
+ else if (atomic_replace)
+ {
+ create_table= orig_table;
+ create_info->table= NULL;
+ table->file->ha_reset();
+ thd->drop_temporary_table(table, NULL, false);
+ table= NULL;
+ }
/*
Do an implicit commit at end of statement for non-temporary
tables. This can fail, but we should unlock the table
nevertheless.
*/
- if (!table->s->tmp_table)
+ if (!create_info->tmp_table())
{
#ifdef WITH_WSREP
if (WSREP(thd) &&
- table->file->ht->db_type == DB_TYPE_INNODB)
+ create_info->db_type->db_type == DB_TYPE_INNODB)
{
if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
{
@@ -5062,10 +5189,6 @@ bool select_create::send_eof()
thd->get_stmt_da()->set_overwrite_status(true);
}
#endif /* WITH_WSREP */
- thd->binlog_xid= thd->query_id;
- /* Remember xid's for the case of row based logging */
- ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
- ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
trans_commit_stmt(thd);
if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
trans_commit_implicit(thd);
@@ -5102,6 +5225,11 @@ bool select_create::send_eof()
ddl_log.org_database= create_table->db;
ddl_log.org_table= create_table->table_name;
ddl_log.org_table_id= create_info->tabledef_version;
+ if (create_info->drop_entry.query.length)
+ {
+ DBUG_ASSERT(atomic_replace);
+ backup_log_ddl(&create_info->drop_entry);
+ }
backup_log_ddl(&ddl_log);
}
/*
@@ -5110,9 +5238,17 @@ bool select_create::send_eof()
(as the query was logged before commit!)
*/
debug_crash_here("ddl_log_create_after_binlog");
- ddl_log_complete(&ddl_log_state_rm);
- ddl_log_complete(&ddl_log_state_create);
- debug_crash_here("ddl_log_create_log_complete");
+ if (create_info->finalize_ddl(thd))
+ {
+ if (atomic_replace)
+ {
+ /* Now we have to log DROP_AFTER_CREATE */
+ atomic_replace= false;
+ create_table= &new_table;
+ }
+ abort_result_set();
+ DBUG_RETURN(true);
+ }
/*
exit_done must only be set after last potential call to
@@ -5120,10 +5256,9 @@ bool select_create::send_eof()
*/
exit_done= 1; // Avoid double calls
- send_ok_packet();
-
if (m_plock)
{
+ DBUG_ASSERT(!atomic_replace);
MYSQL_LOCK *lock= *m_plock;
*m_plock= NULL;
m_plock= NULL;
@@ -5142,11 +5277,37 @@ bool select_create::send_eof()
create_info->
pos_in_locked_tables,
table, lock))
+ {
+ send_ok_packet();
DBUG_RETURN(false); // ok
+ }
/* Fail. Continue without locking the table */
}
mysql_unlock_tables(thd, lock);
}
+ else if (atomic_replace && create_info->pos_in_locked_tables)
+ {
+ DBUG_ASSERT(thd->locked_tables_mode);
+ DBUG_ASSERT(thd->variables.option_bits & OPTION_TABLE_LOCK);
+ TABLE_LIST *pos_in_locked_tables= create_info->pos_in_locked_tables;
+ /*
+ Add back the deleted table and re-created table as a locked table
+ This should always work as we have a meta lock on the table.
+ */
+ thd->locked_tables_list.add_back_last_deleted_lock(pos_in_locked_tables);
+ if (thd->locked_tables_list.reopen_tables(thd, false))
+ {
+ thd->locked_tables_list.unlink_all_closed_tables(thd, NULL, 0);
+ DBUG_RETURN(true);
+ }
+ else
+ {
+ TABLE *table= pos_in_locked_tables->table;
+ table->mdl_ticket->downgrade_lock(MDL_SHARED_NO_READ_WRITE);
+ }
+ }
+
+ send_ok_packet();
DBUG_RETURN(false);
}
@@ -5191,14 +5352,14 @@ void select_create::abort_result_set()
if (table)
{
- bool tmp_table= table->s->tmp_table;
- bool table_creation_was_logged= (!tmp_table ||
- table->s->table_creation_was_logged);
+ bool tmp_table= create_info->tmp_table();
if (tmp_table)
{
DBUG_ASSERT(saved_tmp_table_share);
thd->restore_tmp_table_share(saved_tmp_table_share);
}
+ else if (atomic_replace)
+ create_table= &new_table;
if (table->file->inited &&
(info.ignore || info.handle_duplicates != DUP_ERROR) &&
@@ -5215,7 +5376,14 @@ void select_create::abort_result_set()
m_plock= NULL;
}
- drop_open_table(thd, table, &create_table->db, &create_table->table_name);
+ if (atomic_replace)
+ {
+ (void) table->file->ha_external_lock(thd, F_UNLCK);
+ (void) thd->drop_temporary_table(table, NULL, true);
+ }
+ else
+ drop_open_table(thd, table, &create_table->db,
+ &create_table->table_name);
table=0; // Safety
if (thd->log_current_statement)
{
@@ -5223,39 +5391,30 @@ void select_create::abort_result_set()
{
/* Remove logging of drop, create + insert rows */
binlog_reset_cache(thd);
- /* Original table was deleted. We have to log it */
- if (table_creation_was_logged)
- {
- thd->binlog_xid= thd->query_id;
- ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
- ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
- debug_crash_here("ddl_log_create_before_binlog");
- log_drop_table(thd, &create_table->db, &create_table->table_name,
- &create_info->org_storage_engine_name,
- create_info->db_type == partition_hton,
- &create_info->tabledef_version,
- tmp_table);
- debug_crash_here("ddl_log_create_after_binlog");
- thd->binlog_xid= 0;
- }
}
- else if (!tmp_table)
+ else if (!tmp_table && !atomic_replace)
{
backup_log_info ddl_log;
bzero(&ddl_log, sizeof(ddl_log));
ddl_log.query= { C_STRING_WITH_LEN("DROP_AFTER_CREATE") };
ddl_log.org_partitioned= (create_info->db_type == partition_hton);
ddl_log.org_storage_engine_name= create_info->org_storage_engine_name;
- ddl_log.org_database= create_table->db;
- ddl_log.org_table= create_table->table_name;
+ ddl_log.org_database= orig_table->db;
+ ddl_log.org_table= orig_table->table_name;
ddl_log.org_table_id= create_info->tabledef_version;
backup_log_ddl(&ddl_log);
}
}
}
-
- ddl_log_complete(&ddl_log_state_rm);
- ddl_log_complete(&ddl_log_state_create);
+ if (!binary_logged)
+ {
+ if (ddl_log_state_rm.is_active())
+ (void) ddl_log_revert(thd, &ddl_log_state_create);
+ else
+ ddl_log_complete(&ddl_log_state_create);
+ ddl_log_complete(&ddl_log_state_rm);
+ }
+ thd->binlog_xid= 0;
if (create_info->table_was_deleted)
{