summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc1473
1 files changed, 775 insertions, 698 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index a4943d5e13a..54f94ce78c1 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -55,18 +55,34 @@
*/
-#include "mysql_priv.h"
+#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
+#include "sql_priv.h"
+#include "unireg.h" // REQUIRED: for other includes
+#include "sql_insert.h"
+#include "sql_update.h" // compare_record
+#include "sql_base.h" // close_thread_tables
+#include "sql_cache.h" // query_cache_*
+#include "key.h" // key_copy
+#include "lock.h" // mysql_unlock_tables
#include "sp_head.h"
+#include "sql_view.h" // check_key_in_view, insert_view_fields
+#include "sql_table.h" // mysql_create_table_no_lock
+#include "sql_acl.h" // *_ACL, check_grant_all_columns
#include "sql_trigger.h"
#include "sql_select.h"
#include "sql_show.h"
#include "slave.h"
+#include "sql_parse.h" // end_active_trans
#include "rpl_mi.h"
+#include "transaction.h"
+#include "sql_audit.h"
+#include "sql_derived.h" // mysql_handle_derived
#include "debug_sync.h"
#ifndef EMBEDDED_LIBRARY
-static bool delayed_get_table(THD *thd, TABLE_LIST *table_list);
+static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
+ TABLE_LIST *table_list);
static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
LEX_STRING query, bool ignore, bool log_on);
static void end_delayed_insert(THD *thd);
@@ -75,7 +91,6 @@ static void unlink_blobs(register TABLE *table);
#endif
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
-
/*
Check that insert/update fields are from the same single table of a view.
@@ -124,6 +139,7 @@ bool check_view_single_update(List<Item> &fields, List<Item> *values,
/* Convert to real table bits */
tables&= ~PSEUDO_TABLE_BITS;
+
/* Check found map against provided map */
if (*map)
{
@@ -266,7 +282,7 @@ static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
if (table_list->is_view() && table_list->is_merged_derived())
{
- if (check_view_single_update(fields,
+ if (check_view_single_update(fields,
fields_and_values_from_different_maps ?
(List<Item>*) 0 : &values,
table_list, map, true))
@@ -423,9 +439,9 @@ void prepare_triggers_for_insert_stmt(TABLE *table)
downgrade the lock in handler::store_lock() method.
*/
-void upgrade_lock_type_for_insert(THD *thd, thr_lock_type *lock_type,
- enum_duplicates duplic,
- bool is_multi_insert)
+static
+void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
+ enum_duplicates duplic)
{
if (duplic == DUP_UPDATE ||
(duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
@@ -460,7 +476,7 @@ void upgrade_lock_type_for_insert(THD *thd, thr_lock_type *lock_type,
*/
if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
thd->variables.max_insert_delayed_threads == 0 ||
- thd->prelocked_mode ||
+ thd->locked_tables_mode > LTM_LOCK_TABLES ||
thd->lex->uses_stored_routines())
{
*lock_type= TL_WRITE;
@@ -474,10 +490,9 @@ void upgrade_lock_type_for_insert(THD *thd, thr_lock_type *lock_type,
return;
}
- bool log_on= (thd->options & OPTION_BIN_LOG ||
- ! (thd->security_ctx->master_access & SUPER_ACL));
+ bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
- log_on && mysql_bin_log.is_open() && is_multi_insert)
+ log_on && mysql_bin_log.is_open())
{
/*
Statement-based binary logging does not work in this case, because:
@@ -534,47 +549,78 @@ void upgrade_lock_type_for_insert(THD *thd, thr_lock_type *lock_type,
static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
+ MDL_request protection_request;
DBUG_ENTER("open_and_lock_for_insert_delayed");
#ifndef EMBEDDED_LIBRARY
- if (thd->locked_tables && thd->global_read_lock)
- {
- /*
- If this connection has the global read lock, the handler thread
- will not be able to lock the table. It will wait for the global
- read lock to go away, but this will never happen since the
- connection thread will be stuck waiting for the handler thread
- to open and lock the table.
- If we are not in locked tables mode, INSERT will seek protection
- against the global read lock (and fail), thus we will only get
- to this point in locked tables mode.
- */
- my_error(ER_CANT_UPDATE_WITH_READLOCK, MYF(0));
+ /*
+ In order for the deadlock detector to be able to find any deadlocks
+ caused by the handler thread waiting for GRL or this table, we acquire
+ protection against GRL (global IX metadata lock) and metadata lock on
+ table to being inserted into inside the connection thread.
+ If this goes ok, the tickets are cloned and added to the list of granted
+ locks held by the handler thread.
+ */
+ if (thd->global_read_lock.can_acquire_protection())
DBUG_RETURN(TRUE);
- }
- if (delayed_get_table(thd, table_list))
+ protection_request.init(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE,
+ MDL_STATEMENT);
+
+ if (thd->mdl_context.acquire_lock(&protection_request,
+ thd->variables.lock_wait_timeout))
DBUG_RETURN(TRUE);
- if (table_list->table)
+ if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
+ thd->variables.lock_wait_timeout))
+ /*
+ If a lock can't be acquired, it makes no sense to try normal insert.
+ Therefore we just abort the statement.
+ */
+ DBUG_RETURN(TRUE);
+
+ bool error= FALSE;
+ if (delayed_get_table(thd, &protection_request, table_list))
+ error= TRUE;
+ else if (table_list->table)
{
/*
Open tables used for sub-selects or in stored functions, will also
cache these functions.
*/
- if (open_and_lock_tables(thd, table_list->next_global))
+ if (open_and_lock_tables(thd, table_list->next_global, TRUE, 0))
{
end_delayed_insert(thd);
- DBUG_RETURN(TRUE);
+ error= TRUE;
+ }
+ else
+ {
+ /*
+ First table was not processed by open_and_lock_tables(),
+ we need to set updatability flag "by hand".
+ */
+ if (!table_list->derived && !table_list->view)
+ table_list->updatable= 1; // usual table
}
- /*
- First table was not processed by open_and_lock_tables(),
- we need to set updatability flag "by hand".
- */
- if (!table_list->derived && !table_list->view)
- table_list->updatable= 1; // usual table
- DBUG_RETURN(FALSE);
}
+
+ /*
+ We can't release protection against GRL and metadata lock on the table
+ being inserted into here. These locks might be required, for example,
+ because this INSERT DELAYED calls functions which may try to update
+ this or another tables (updating the same table is of course illegal,
+ but such an attempt can be discovered only later during statement
+ execution).
+ */
+
+ /*
+ Reset the ticket in case we end up having to use normal insert and
+ therefore will reopen the table and reacquire the metadata lock.
+ */
+ table_list->mdl_request.ticket= NULL;
+
+ if (error || table_list->table)
+ DBUG_RETURN(error);
#endif
/*
* This is embedded library and we don't have auxiliary
@@ -586,7 +632,31 @@ bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
Use a normal insert.
*/
table_list->lock_type= TL_WRITE;
- DBUG_RETURN(open_and_lock_tables(thd, table_list));
+ DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
+}
+
+
+/**
+ Create a new query string for removing DELAYED keyword for
+ multi INSERT DEALAYED statement.
+
+ @param[in] thd Thread handler
+ @param[in] buf Query string
+
+ @return
+ 0 ok
+ 1 error
+*/
+static int
+create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
+{
+ /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
+ if (buf->append(thd->query()) ||
+ buf->replace(thd->lex->keyword_delayed_begin_offset,
+ thd->lex->keyword_delayed_end_offset -
+ thd->lex->keyword_delayed_begin_offset, 0))
+ return 1;
+ return 0;
}
@@ -625,10 +695,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
/*
log_on is about delayed inserts only.
By default, both logs are enabled (this won't cause problems if the server
- runs without --log-update or --log-bin).
+ runs without --log-bin).
*/
- bool log_on= ((thd->options & OPTION_BIN_LOG) ||
- (!(thd->security_ctx->master_access & SUPER_ACL)));
+ bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
#endif
thr_lock_type lock_type;
Item *unused_conds= 0;
@@ -638,16 +707,17 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
Upgrade lock type if the requested lock is incompatible with
the current connection mode or table operation.
*/
- upgrade_lock_type_for_insert(thd, &table_list->lock_type, duplic,
- values_list.elements > 1);
+ upgrade_lock_type(thd, &table_list->lock_type, duplic);
/*
We can't write-delayed into a table locked with LOCK TABLES:
this will lead to a deadlock, since the delayed thread will
never be able to get a lock on the table.
*/
- if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
- find_locked_table(thd, table_list->db, table_list->table_name))
+ if (table_list->lock_type == TL_WRITE_DELAYED &&
+ thd->locked_tables_mode &&
+ find_locked_table(thd->open_tables, table_list->db,
+ table_list->table_name))
{
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
table_list->table_name);
@@ -667,7 +737,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
}
else
{
- if (open_and_lock_tables(thd, table_list))
+ if (open_and_lock_tables(thd, table_list, TRUE, 0))
DBUG_RETURN(TRUE);
}
@@ -782,7 +852,15 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
{
if (duplic != DUP_ERROR || ignore)
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
- if (!thd->prelocked_mode && values_list.elements > 1)
+ /**
+ This is a simple check for the case when the table has a trigger
+ that reads from it, or when the statement invokes a stored function
+ that reads from the table being inserted to.
+ Engines can't handle a bulk insert in parallel with a read form the
+ same table in the same connection.
+ */
+ if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
+ values_list.elements > 1)
{
using_bulk_insert= 1;
table->file->ha_start_bulk_insert(values_list.elements);
@@ -887,7 +965,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
error=write_record(thd, table ,&info);
if (error)
break;
- thd->row_count++;
+ thd->warning_info->inc_current_row_for_warning();
}
free_underlaid_joins(thd, &thd->lex->select_lex);
@@ -933,6 +1011,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
*/
query_cache_invalidate3(thd, table_list, 1);
}
+
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
+
if (error <= 0 ||
thd->transaction.stmt.modified_non_trans_table ||
was_insert_delayed)
@@ -947,7 +1029,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
thd->net.last_error/errno. For example if there has
been a disk full error when writing the row, and it was
MyISAM, then thd->net.last_error/errno will be set to
- "disk full"... and the my_pwrite() will wait until free
+ "disk full"... and the mysql_file_pwrite() will wait until free
space appears, and so when it finishes then the
write_row() was entirely successful
*/
@@ -970,16 +1052,29 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
such case the flag is ignored for constructing binlog event.
*/
DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
- if (thd->binlog_query(THD::ROW_QUERY_TYPE,
- thd->query(), thd->query_length(),
- transactional_table, FALSE,
- errcode))
+ if (was_insert_delayed && table_list->lock_type == TL_WRITE)
{
- error=1;
- }
+ /* Binlog INSERT DELAYED as INSERT without DELAYED. */
+ String log_query;
+ if (create_insert_stmt_from_insert_delayed(thd, &log_query))
+ {
+ sql_print_error("Event Error: An error occurred while creating query string"
+ "for INSERT DELAYED stmt, before writing it into binary log.");
+
+ error= 1;
+ }
+ else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+ log_query.c_ptr(), log_query.length(),
+ transactional_table, FALSE, FALSE,
+ errcode))
+ error= 1;
+ }
+ else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+ thd->query(), thd->query_length(),
+ transactional_table, FALSE, FALSE,
+ errcode))
+ error= 1;
}
- if (thd->transaction.stmt.modified_non_trans_table)
- thd->transaction.all.modified_non_trans_table= TRUE;
}
DBUG_ASSERT(transactional_table || !changed ||
thd->transaction.stmt.modified_non_trans_table);
@@ -1010,13 +1105,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
if (error)
goto abort;
- if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
+ if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->cuted_fields))
{
- thd->row_count_func= info.copied + info.deleted +
- ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
- info.touched : info.updated);
- my_ok(thd, (ulong) thd->row_count_func, id);
+ my_ok(thd, info.copied + info.deleted +
+ ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
+ info.touched : info.updated),
+ id);
}
else
{
@@ -1026,12 +1121,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
if (ignore)
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
(lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
- (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
+ (ulong) (info.records - info.copied),
+ (ulong) thd->warning_info->statement_warn_count());
else
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
- (ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
- thd->row_count_func= info.copied + info.deleted + updated;
- ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
+ (ulong) (info.deleted + updated),
+ (ulong) thd->warning_info->statement_warn_count());
+ ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
}
thd->abort_on_warning= 0;
if (thd->lex->current_select->first_cond_optimization)
@@ -1096,7 +1192,7 @@ static bool check_view_insertability(THD * thd, TABLE_LIST *view)
DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
- VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
+ (void) bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
bitmap_clear_all(&used_fields);
view->contain_auto_increment= 0;
@@ -1350,8 +1446,8 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
thd->abort_on_warning= saved_abort_on_warning;
}
- if (!res)
- res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
+ if (!res)
+ res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
if (!res && duplic == DUP_UPDATE)
{
@@ -1409,6 +1505,23 @@ bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
static int last_uniq_key(TABLE *table,uint keynr)
{
+ /*
+ When an underlying storage engine informs that the unique key
+ conflicts are not reported in the ascending order by setting
+ the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
+ information to determine the last key conflict.
+
+ The information about the last key conflict will be used to
+ do a replace of the new row on the conflicting row, rather
+ than doing a delete (of old row) + insert (of new row).
+
+ Hence check for this flag and disable replacing the last row
+ by returning 0 always. Returning 0 will result in doing
+ a delete + insert always.
+ */
+ if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
+ return 0;
+
while (++keynr < table->s->keys)
if (table->key_info[keynr].flags & HA_NOSAME)
return 0;
@@ -1565,9 +1678,6 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
goto before_trg_err;
table->file->restore_auto_increment(prev_insert_id);
- if (table->next_number_field)
- table->file->adjust_next_insert_id_after_explicit_value(
- table->next_number_field->val_int());
info->touched++;
if (!records_are_comparable(table) || compare_record(table))
{
@@ -1605,8 +1715,6 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
if (table->next_number_field)
table->file->adjust_next_insert_id_after_explicit_value(
table->next_number_field->val_int());
- info->touched++;
-
goto ok_or_after_trg_err;
}
else /* DUP_REPLACE */
@@ -1782,7 +1890,7 @@ public:
enum_duplicates dup;
my_time_t start_time;
ulong start_time_sec_part;
- ulong sql_mode;
+ ulonglong sql_mode;
bool auto_increment_field_not_null;
bool query_start_used, ignore, log_query, query_start_sec_part_used;
bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
@@ -1801,8 +1909,8 @@ public:
{}
~delayed_row()
{
- x_free(query.str);
- x_free(record);
+ my_free(query.str);
+ my_free(record);
}
};
@@ -1819,50 +1927,66 @@ class Delayed_insert :public ilink {
public:
THD thd;
TABLE *table;
- pthread_mutex_t mutex;
- pthread_cond_t cond,cond_client;
+ mysql_mutex_t mutex;
+ mysql_cond_t cond, cond_client;
volatile uint tables_in_use,stacked_inserts;
- volatile bool status,dead;
+ volatile bool status;
+ /**
+ When the handler thread starts, it clones a metadata lock ticket
+ which protects against GRL and ticket for the table to be inserted.
+ This is done to allow the deadlock detector to detect deadlocks
+ resulting from these locks.
+ Before this is done, the connection thread cannot safely exit
+ without causing problems for clone_ticket().
+ Once handler_thread_initialized has been set, it is safe for the
+ connection thread to exit.
+ Access to handler_thread_initialized is protected by di->mutex.
+ */
+ bool handler_thread_initialized;
COPY_INFO info;
I_List<delayed_row> rows;
ulong group_count;
TABLE_LIST table_list; // Argument
+ /**
+ Request for IX metadata lock protecting against GRL which is
+ passed from connection thread to the handler thread.
+ */
+ MDL_request grl_protection;
Delayed_insert()
- :locks_in_memory(0),
- table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
- group_count(0)
+ :locks_in_memory(0), table(0),tables_in_use(0),stacked_inserts(0),
+ status(0), handler_thread_initialized(FALSE), group_count(0)
{
+ DBUG_ENTER("Delayed_insert constructor");
thd.security_ctx->user=(char*) delayed_user;
thd.security_ctx->host=(char*) my_localhost;
strmake(thd.security_ctx->priv_user, thd.security_ctx->user,
USERNAME_LENGTH);
thd.current_tablenr=0;
- thd.version=refresh_version;
thd.command=COM_DELAYED_INSERT;
thd.lex->current_select= 0; // for my_message_sql
thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
/*
- Statement-based replication of INSERT DELAYED has problems with RAND()
- and user vars, so in mixed mode we go to row-based.
+ Prevent changes to global.lock_wait_timeout from affecting
+ delayed insert threads as any timeouts in delayed inserts
+ are not communicated to the client.
*/
- thd.lex->set_stmt_unsafe();
- thd.set_current_stmt_binlog_row_based_if_mixed();
+ thd.variables.lock_wait_timeout= LONG_TIMEOUT;
bzero((char*) &thd.net, sizeof(thd.net)); // Safety
bzero((char*) &table_list, sizeof(table_list)); // Safety
thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
thd.security_ctx->host_or_ip= "";
bzero((char*) &info,sizeof(info));
- my_pthread_mutex_init(&mutex, MY_MUTEX_INIT_FAST, "Delayed_insert::mutex",
- 0);
- pthread_cond_init(&cond,NULL);
- pthread_cond_init(&cond_client,NULL);
- VOID(pthread_mutex_lock(&LOCK_thread_count));
+ mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
+ mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
+ mysql_mutex_lock(&LOCK_thread_count);
delayed_insert_threads++;
delayed_lock= global_system_variables.low_priority_updates ?
TL_WRITE_LOW_PRIORITY : TL_WRITE;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ mysql_mutex_unlock(&LOCK_thread_count);
+ DBUG_VOID_RETURN;
}
~Delayed_insert()
{
@@ -1871,18 +1995,21 @@ public:
while ((row=rows.get()))
delete row;
if (table)
+ {
close_thread_tables(&thd);
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- pthread_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
- pthread_cond_destroy(&cond_client);
+ thd.mdl_context.release_transactional_locks();
+ }
+ mysql_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_destroy(&mutex);
+ mysql_cond_destroy(&cond);
+ mysql_cond_destroy(&cond_client);
thd.unlink(); // Must be unlinked under lock
- x_free(thd.query());
+ my_free(thd.query());
thd.security_ctx->user= thd.security_ctx->host=0;
thread_count--;
delayed_insert_threads--;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
+ mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_cond_broadcast(&COND_thread_count); /* Tell main we are ready */
}
/* The following is for checking when we can delete ourselves */
@@ -1892,22 +2019,23 @@ public:
}
void unlock()
{
- pthread_mutex_lock(&LOCK_delayed_insert);
+ mysql_mutex_lock(&LOCK_delayed_insert);
if (!--locks_in_memory)
{
- pthread_mutex_lock(&mutex);
+ mysql_mutex_lock(&mutex);
if (thd.killed && ! stacked_inserts && ! tables_in_use)
{
- pthread_cond_signal(&cond);
+ mysql_cond_signal(&cond);
status=1;
}
- pthread_mutex_unlock(&mutex);
+ mysql_mutex_unlock(&mutex);
}
- pthread_mutex_unlock(&LOCK_delayed_insert);
+ mysql_mutex_unlock(&LOCK_delayed_insert);
}
inline uint lock_count() { return locks_in_memory; }
TABLE* get_local_table(THD* client_thd);
+ bool open_and_lock_table();
bool handle_inserts(void);
};
@@ -1924,7 +2052,7 @@ static
Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
thd_proc_info(thd, "waiting for delay_list");
- pthread_mutex_lock(&LOCK_delayed_insert); // Protect master list
+ mysql_mutex_lock(&LOCK_delayed_insert); // Protect master list
I_List_iterator<Delayed_insert> it(delayed_threads);
Delayed_insert *di;
while ((di= it++))
@@ -1936,7 +2064,7 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
break;
}
}
- pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
+ mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
return di;
}
@@ -1981,13 +2109,13 @@ Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
a given consumer (delayed insert thread), only at different
stages of producer-consumer relationship.
- 'dead' and 'status' variables in Delayed_insert are redundant
- too, since there is already 'di->thd.killed' and
- di->stacked_inserts.
+ The 'status' variable in Delayed_insert is redundant
+ too, since there is already di->stacked_inserts.
*/
static
-bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
+bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
+ TABLE_LIST *table_list)
{
int error;
Delayed_insert *di;
@@ -2006,7 +2134,7 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
DBUG_RETURN(0);
thd_proc_info(thd, "Creating delayed handler");
- pthread_mutex_lock(&LOCK_delayed_create);
+ mysql_mutex_lock(&LOCK_delayed_create);
/*
The first search above was done without LOCK_delayed_create.
Another thread might have created the handler in between. Search again.
@@ -2014,55 +2142,71 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
if (! (di= find_handler(thd, table_list)))
{
if (!(di= new Delayed_insert()))
- {
- thd->fatal_error();
goto end_create;
- }
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thread_count++;
- pthread_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
/*
Annotating delayed inserts is not supported.
*/
di->thd.variables.binlog_annotate_row_events= 0;
di->thd.set_db(table_list->db, (uint) strlen(table_list->db));
- di->thd.set_query(my_strdup(table_list->table_name, MYF(MY_WME)), 0);
+ di->thd.set_query(my_strdup(table_list->table_name,
+ MYF(MY_WME | ME_FATALERROR)),
+ 0, system_charset_info);
if (di->thd.db == NULL || di->thd.query() == NULL)
{
/* The error is reported */
delete di;
- thd->fatal_error();
goto end_create;
}
di->table_list= *table_list; // Needed to open table
/* Replace volatile strings with local copies */
di->table_list.alias= di->table_list.table_name= di->thd.query();
di->table_list.db= di->thd.db;
+ /* We need the tickets so that they can be cloned in handle_delayed_insert */
+ di->grl_protection.init(MDL_key::GLOBAL, "", "",
+ MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
+ di->grl_protection.ticket= grl_protection_request->ticket;
+ init_mdl_requests(&di->table_list);
+ di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
+
di->lock();
- pthread_mutex_lock(&di->mutex);
- if ((error= pthread_create(&di->thd.real_id, &connection_attrib,
- handle_delayed_insert, (void*) di)))
+ mysql_mutex_lock(&di->mutex);
+ if ((error= mysql_thread_create(key_thread_delayed_insert,
+ &di->thd.real_id, &connection_attrib,
+ handle_delayed_insert, (void*) di)))
{
DBUG_PRINT("error",
("Can't create thread to handle delayed insert (error %d)",
error));
- pthread_mutex_unlock(&di->mutex);
+ mysql_mutex_unlock(&di->mutex);
di->unlock();
delete di;
- my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
- thd->fatal_error();
+ my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error);
goto end_create;
}
- /* Wait until table is open */
+ /*
+ Wait until table is open unless the handler thread or the connection
+ thread has been killed. Note that we in all cases must wait until the
+ handler thread has been properly initialized before exiting. Otherwise
+ we risk doing clone_ticket() on a ticket that is no longer valid.
+ */
thd_proc_info(thd, "waiting for handler open");
- while (!di->thd.killed && !di->table && !thd->killed)
+ while (!di->handler_thread_initialized ||
+ (!di->thd.killed && !di->table && !thd->killed))
{
- pthread_cond_wait(&di->cond_client, &di->mutex);
+ mysql_cond_wait(&di->cond_client, &di->mutex);
}
- pthread_mutex_unlock(&di->mutex);
+ mysql_mutex_unlock(&di->mutex);
thd_proc_info(thd, "got old table");
+ if (thd->killed)
+ {
+ di->unlock();
+ goto end_create;
+ }
if (di->thd.killed)
{
if (di->thd.is_error())
@@ -2070,30 +2214,29 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
/*
Copy the error message. Note that we don't treat fatal
errors in the delayed thread as fatal errors in the
- main thread. Use of my_message will enable stored
- procedures continue handlers.
+ main thread. If delayed thread was killed, we don't
+ want to send "Server shutdown in progress" in the
+ INSERT THREAD.
*/
- my_message(di->thd.main_da.sql_errno(), di->thd.main_da.message(),
- MYF(0));
- }
- di->unlock();
+ if (di->thd.stmt_da->sql_errno() == ER_SERVER_SHUTDOWN)
+ my_message(ER_QUERY_INTERRUPTED, ER(ER_QUERY_INTERRUPTED), MYF(0));
+ else
+ my_message(di->thd.stmt_da->sql_errno(), di->thd.stmt_da->message(),
+ MYF(0));
+ }
+ di->unlock();
goto end_create;
}
- if (thd->killed)
- {
- di->unlock();
- goto end_create;
- }
- pthread_mutex_lock(&LOCK_delayed_insert);
+ mysql_mutex_lock(&LOCK_delayed_insert);
delayed_threads.append(di);
- pthread_mutex_unlock(&LOCK_delayed_insert);
+ mysql_mutex_unlock(&LOCK_delayed_insert);
}
- pthread_mutex_unlock(&LOCK_delayed_create);
+ mysql_mutex_unlock(&LOCK_delayed_create);
}
- pthread_mutex_lock(&di->mutex);
+ mysql_mutex_lock(&di->mutex);
table_list->table= di->get_local_table(thd);
- pthread_mutex_unlock(&di->mutex);
+ mysql_mutex_unlock(&di->mutex);
if (table_list->table)
{
DBUG_ASSERT(! thd->is_error());
@@ -2104,7 +2247,7 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
DBUG_RETURN((table_list->table == NULL));
end_create:
- pthread_mutex_unlock(&LOCK_delayed_create);
+ mysql_mutex_unlock(&LOCK_delayed_create);
DBUG_RETURN(thd->is_error());
}
@@ -2141,17 +2284,33 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
if (!thd.lock) // Table is not locked
{
thd_proc_info(client_thd, "waiting for handler lock");
- pthread_cond_signal(&cond); // Tell handler to lock table
- while (!dead && !thd.lock && ! client_thd->killed)
+ mysql_cond_signal(&cond); // Tell handler to lock table
+ while (!thd.killed && !thd.lock && ! client_thd->killed)
{
- pthread_cond_wait(&cond_client,&mutex);
+ mysql_cond_wait(&cond_client, &mutex);
}
thd_proc_info(client_thd, "got handler lock");
if (client_thd->killed)
goto error;
- if (dead)
+ if (thd.killed)
{
- my_message(thd.main_da.sql_errno(), thd.main_da.message(), MYF(0));
+ /*
+ Copy the error message. Note that we don't treat fatal
+ errors in the delayed thread as fatal errors in the
+ main thread. If delayed thread was killed, we don't
+ want to send "Server shutdown in progress" in the
+ INSERT THREAD.
+
+ The thread could be killed with an error message if
+ di->handle_inserts() or di->open_and_lock_table() fails.
+ The thread could be killed without an error message if
+ killed using mysql_notify_thread_having_shared_lock() or
+ kill_delayed_threads_for_table().
+ */
+ if (!thd.is_error() || thd.stmt_da->sql_errno() == ER_SERVER_SHUTDOWN)
+ my_message(ER_QUERY_INTERRUPTED, ER(ER_QUERY_INTERRUPTED), MYF(0));
+ else
+ my_message(thd.stmt_da->sql_errno(), thd.stmt_da->message(), MYF(0));
goto error;
}
}
@@ -2235,7 +2394,7 @@ TABLE *Delayed_insert::get_local_table(THD* client_thd)
error:
tables_in_use--;
status=1;
- pthread_cond_signal(&cond); // Inform thread about abort
+ mysql_cond_signal(&cond); // Inform thread about abort
DBUG_RETURN(0);
}
@@ -2254,9 +2413,9 @@ int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
(ulong) query.length));
thd_proc_info(thd, "waiting for handler insert");
- pthread_mutex_lock(&di->mutex);
+ mysql_mutex_lock(&di->mutex);
while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
- pthread_cond_wait(&di->cond_client,&di->mutex);
+ mysql_cond_wait(&di->cond_client, &di->mutex);
thd_proc_info(thd, "storing row into queue");
if (thd->killed)
@@ -2278,7 +2437,7 @@ int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
row= new delayed_row(query, duplic, ignore, log_on);
if (row == NULL)
{
- my_free(query.str, MYF(MY_WME));
+ my_free(query.str);
goto err;
}
@@ -2333,15 +2492,15 @@ int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
di->status=1;
if (table->s->blob_fields)
unlink_blobs(table);
- pthread_cond_signal(&di->cond);
+ mysql_cond_signal(&di->cond);
thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
- pthread_mutex_unlock(&di->mutex);
+ mysql_mutex_unlock(&di->mutex);
DBUG_RETURN(0);
err:
delete row;
- pthread_mutex_unlock(&di->mutex);
+ mysql_mutex_unlock(&di->mutex);
DBUG_RETURN(1);
}
@@ -2354,14 +2513,14 @@ static void end_delayed_insert(THD *thd)
{
DBUG_ENTER("end_delayed_insert");
Delayed_insert *di=thd->di;
- pthread_mutex_lock(&di->mutex);
+ mysql_mutex_lock(&di->mutex);
DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
if (!--di->tables_in_use || di->thd.killed)
{ // Unlock table
di->status=1;
- pthread_cond_signal(&di->cond);
+ mysql_cond_signal(&di->cond);
}
- pthread_mutex_unlock(&di->mutex);
+ mysql_mutex_unlock(&di->mutex);
DBUG_VOID_RETURN;
}
@@ -2370,17 +2529,17 @@ static void end_delayed_insert(THD *thd)
void kill_delayed_threads(void)
{
- VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list
+ mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
I_List_iterator<Delayed_insert> it(delayed_threads);
Delayed_insert *di;
while ((di= it++))
{
- di->thd.killed= KILL_SYSTEM_THREAD;
- pthread_mutex_lock(&di->thd.LOCK_thd_data);
+ di->thd.killed= KILL_CONNECTION;
+ mysql_mutex_lock(&di->thd.LOCK_thd_data);
if (di->thd.mysys_var)
{
- pthread_mutex_lock(&di->thd.mysys_var->mutex);
+ mysql_mutex_lock(&di->thd.mysys_var->mutex);
if (di->thd.mysys_var->current_cond)
{
/*
@@ -2388,212 +2547,114 @@ void kill_delayed_threads(void)
in handle_delayed_insert()
*/
if (&di->mutex != di->thd.mysys_var->current_mutex)
- my_pthread_mutex_lock(di->thd.mysys_var->current_mutex,
- MYF_NO_DEADLOCK_DETECTION);
- pthread_cond_broadcast(di->thd.mysys_var->current_cond);
+ mysql_mutex_lock(di->thd.mysys_var->current_mutex);
+ mysql_cond_broadcast(di->thd.mysys_var->current_cond);
if (&di->mutex != di->thd.mysys_var->current_mutex)
- pthread_mutex_unlock(di->thd.mysys_var->current_mutex);
+ mysql_mutex_unlock(di->thd.mysys_var->current_mutex);
}
- pthread_mutex_unlock(&di->thd.mysys_var->mutex);
+ mysql_mutex_unlock(&di->thd.mysys_var->mutex);
}
- pthread_mutex_unlock(&di->thd.LOCK_thd_data);
+ mysql_mutex_unlock(&di->thd.LOCK_thd_data);
}
- VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
+ mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
}
-static void handle_delayed_insert_impl(THD *thd, Delayed_insert *di)
+/**
+ A strategy for the prelocking algorithm which prevents the
+ delayed insert thread from opening tables with engines which
+ do not support delayed inserts.
+
+ Particularly it allows to abort open_tables() as soon as we
+ discover that we have opened a MERGE table, without acquiring
+ metadata locks on underlying tables.
+*/
+
+class Delayed_prelocking_strategy : public Prelocking_strategy
{
- DBUG_ENTER("handle_delayed_insert_impl");
- thd->thread_stack= (char*) &thd;
- if (init_thr_lock() || thd->store_globals())
- {
- /* Can't use my_error since store_globals has perhaps failed */
- thd->main_da.set_error_status(thd, ER_OUT_OF_RESOURCES,
- ER(ER_OUT_OF_RESOURCES));
- thd->fatal_error();
- goto err;
- }
+public:
+ virtual bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
+ Sroutine_hash_entry *rt, sp_head *sp,
+ bool *need_prelocking);
+ virtual bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
+ TABLE_LIST *table_list, bool *need_prelocking);
+ virtual bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
+ TABLE_LIST *table_list, bool *need_prelocking);
+};
- /*
- Open table requires an initialized lex in case the table is
- partitioned. The .frm file contains a partial SQL string which is
- parsed using a lex, that depends on initialized thd->lex.
- */
- lex_start(thd);
- thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
- /*
- Statement-based replication of INSERT DELAYED has problems with RAND()
- and user vars, so in mixed mode we go to row-based.
- */
- thd->lex->set_stmt_unsafe();
- thd->set_current_stmt_binlog_row_based_if_mixed();
- /* Open table */
- if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
- TL_WRITE_DELAYED)))
- {
- thd->fatal_error(); // Abort waiting inserts
- goto err;
- }
- if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
- {
- thd->fatal_error();
- my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), di->table_list.table_name);
- goto err;
- }
- if (di->table->triggers)
+bool Delayed_prelocking_strategy::
+handle_table(THD *thd, Query_tables_list *prelocking_ctx,
+ TABLE_LIST *table_list, bool *need_prelocking)
+{
+ DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);
+
+ if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
{
- /*
- Table has triggers. This is not an error, but we do
- not support triggers with delayed insert. Terminate the delayed
- thread without an error and thus request lock upgrade.
- */
- goto err;
+ my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name);
+ return TRUE;
}
- di->table->copy_blobs=1;
+ return FALSE;
+}
- /* Tell client that the thread is initialized */
- pthread_cond_signal(&di->cond_client);
- /* Now wait until we get an insert or lock to handle */
- /* We will not abort as long as a client thread uses this thread */
+bool Delayed_prelocking_strategy::
+handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
+ Sroutine_hash_entry *rt, sp_head *sp,
+ bool *need_prelocking)
+{
+ /* LEX used by the delayed insert thread has no routines. */
+ DBUG_ASSERT(0);
+ return FALSE;
+}
- for (;;)
- {
- if (thd->killed >= KILL_CONNECTION)
- {
- uint lock_count;
- /*
- Remove this from delay insert list so that no one can request a
- table from this
- */
- pthread_mutex_unlock(&di->mutex);
- pthread_mutex_lock(&LOCK_delayed_insert);
- di->unlink();
- lock_count=di->lock_count();
- pthread_mutex_unlock(&LOCK_delayed_insert);
- pthread_mutex_lock(&di->mutex);
- if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
- break; // Time to die
- }
- if (!di->status && !di->stacked_inserts)
- {
- struct timespec abstime;
- set_timespec(abstime, delayed_insert_timeout);
+bool Delayed_prelocking_strategy::
+handle_view(THD *thd, Query_tables_list *prelocking_ctx,
+ TABLE_LIST *table_list, bool *need_prelocking)
+{
+ /* We don't open views in the delayed insert thread. */
+ DBUG_ASSERT(0);
+ return FALSE;
+}
- /* Information for pthread_kill */
- di->thd.mysys_var->current_mutex= &di->mutex;
- di->thd.mysys_var->current_cond= &di->cond;
- thd_proc_info(&(di->thd), "Waiting for INSERT");
- DBUG_PRINT("info",("Waiting for someone to insert rows"));
- while (!thd->killed)
- {
- int error;
-#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
- error=pthread_cond_wait(&di->cond,&di->mutex);
-#else
- error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
-#ifdef EXTRA_DEBUG
- if (error && error != EINTR && error != ETIMEDOUT)
- {
- fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
- DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
- error));
- }
-#endif
-#endif
- if (thd->killed || di->status)
- break;
- if (error == ETIMEDOUT || error == ETIME)
- {
- thd->killed= KILL_SYSTEM_THREAD;
- break;
- }
- }
- /* We can't lock di->mutex and mysys_var->mutex at the same time */
- pthread_mutex_unlock(&di->mutex);
- pthread_mutex_lock(&di->thd.mysys_var->mutex);
- di->thd.mysys_var->current_mutex= 0;
- di->thd.mysys_var->current_cond= 0;
- pthread_mutex_unlock(&di->thd.mysys_var->mutex);
- pthread_mutex_lock(&di->mutex);
- }
- thd_proc_info(&(di->thd), 0);
+/**
+ Open and lock table for use by delayed thread and check that
+ this table is suitable for delayed inserts.
- if (di->tables_in_use && ! thd->lock)
- {
- bool not_used;
- /*
- Request for new delayed insert.
- Lock the table, but avoid to be blocked by a global read lock.
- If we got here while a global read lock exists, then one or more
- inserts started before the lock was requested. These are allowed
- to complete their work before the server returns control to the
- client which requested the global read lock. The delayed insert
- handler will close the table and finish when the outstanding
- inserts are done.
- */
- if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
- MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
- &not_used)))
- {
- /* Fatal error */
- di->dead= 1;
- thd->killed= KILL_SYSTEM_THREAD;
- }
- pthread_cond_broadcast(&di->cond_client);
- }
- if (di->stacked_inserts)
- {
- if (di->handle_inserts())
- {
- /* Some fatal error */
- di->dead= 1;
- thd->killed= KILL_SYSTEM_THREAD;
- }
- }
- di->status=0;
- if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
- {
- /*
- No one is doing a insert delayed
- Unlock table so that other threads can use it
- */
- MYSQL_LOCK *lock=thd->lock;
- thd->lock=0;
- pthread_mutex_unlock(&di->mutex);
- /*
- We need to release next_insert_id before unlocking. This is
- enforced by handler::ha_external_lock().
- */
- di->table->file->ha_release_auto_increment();
- mysql_unlock_tables(thd, lock);
- ha_autocommit_or_rollback(thd, 0);
- di->group_count=0;
- pthread_mutex_lock(&di->mutex);
- }
- if (di->tables_in_use)
- pthread_cond_broadcast(&di->cond_client); // If waiting clients
- }
+ @retval FALSE - Success.
+ @retval TRUE - Failure.
+*/
+
+bool Delayed_insert::open_and_lock_table()
+{
+ Delayed_prelocking_strategy prelocking_strategy;
-err:
/*
- mysql_lock_tables() can potentially start a transaction and write
- a table map. In the event of an error, that transaction has to be
- rolled back. We only need to roll back a potential statement
- transaction, since real transactions are rolled back in
- close_thread_tables().
-
- TODO: This is not true any more, table maps are generated on the
- first call to ha_*_row() instead. Remove code that are used to
- cover for the case outlined above.
- */
- ha_autocommit_or_rollback(thd, 1);
+ Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
+ error for tables with engines which don't support delayed inserts.
+ */
+ if (!(table= open_n_lock_single_table(&thd, &table_list,
+ TL_WRITE_DELAYED,
+ MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK,
+ &prelocking_strategy)))
+ {
+ thd.fatal_error(); // Abort waiting inserts
+ return TRUE;
+ }
- DBUG_VOID_RETURN;
+ if (table->triggers)
+ {
+ /*
+ Table has triggers. This is not an error, but we do
+ not support triggers with delayed insert. Terminate the delayed
+ thread without an error and thus request lock upgrade.
+ */
+ return TRUE;
+ }
+ table->copy_blobs= 1;
+ return FALSE;
}
@@ -2608,55 +2669,217 @@ pthread_handler_t handle_delayed_insert(void *arg)
pthread_detach_this_thread();
/* Add thread to THD list so that's it's visible in 'show processlist' */
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
thd->set_current_time();
threads.append(thd);
- thd->killed=abort_loop ? KILL_SYSTEM_THREAD : NOT_KILLED;
- pthread_mutex_unlock(&LOCK_thread_count);
+ thd->killed=abort_loop ? KILL_CONNECTION : NOT_KILLED;
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ mysql_thread_set_psi_id(thd->thread_id);
/*
- Wait until the client runs into pthread_cond_wait(),
+ Wait until the client runs into mysql_cond_wait(),
where we free it after the table is opened and di linked in the list.
If we did not wait here, the client might detect the opened table
before it is linked to the list. It would release LOCK_delayed_create
and allow another thread to create another handler for the same table,
since it does not find one in the list.
*/
- pthread_mutex_lock(&di->mutex);
-#if !defined( __WIN__) /* Win32 calls this in pthread_create */
+ mysql_mutex_lock(&di->mutex);
if (my_thread_init())
{
/* Can't use my_error since store_globals has not yet been called */
- thd->main_da.set_error_status(thd, ER_OUT_OF_RESOURCES,
- ER(ER_OUT_OF_RESOURCES));
- goto end;
+ thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
+ ER(ER_OUT_OF_RESOURCES), NULL);
+ di->handler_thread_initialized= TRUE;
}
-#endif
+ else
+ {
+ DBUG_ENTER("handle_delayed_insert");
+ thd->thread_stack= (char*) &thd;
+ if (init_thr_lock() || thd->store_globals())
+ {
+ /* Can't use my_error since store_globals has perhaps failed */
+ thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
+ ER(ER_OUT_OF_RESOURCES), NULL);
+ di->handler_thread_initialized= TRUE;
+ thd->fatal_error();
+ goto err;
+ }
+
+ thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
+
+ /*
+ INSERT DELAYED has to go to row-based format because the time
+ at which rows are inserted cannot be determined in mixed mode.
+ */
+ thd->set_current_stmt_binlog_format_row_if_mixed();
+
+ /*
+ Clone tickets representing protection against GRL and the lock on
+ the target table for the insert and add them to the list of granted
+ metadata locks held by the handler thread. This is safe since the
+ handler thread is not holding nor waiting on any metadata locks.
+ */
+ if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
+ thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
+ {
+ thd->mdl_context.release_transactional_locks();
+ di->handler_thread_initialized= TRUE;
+ goto err;
+ }
+
+ /*
+ Now that the ticket has been cloned, it is safe for the connection
+ thread to exit.
+ */
+ di->handler_thread_initialized= TRUE;
+ di->table_list.mdl_request.ticket= NULL;
+
+ if (di->open_and_lock_table())
+ goto err;
+
+ /* Tell client that the thread is initialized */
+ mysql_cond_signal(&di->cond_client);
+
+ /* Now wait until we get an insert or lock to handle */
+ /* We will not abort as long as a client thread uses this thread */
+
+ for (;;)
+ {
+ if (thd->killed)
+ {
+ uint lock_count;
+ /*
+ Remove this from delay insert list so that no one can request a
+ table from this
+ */
+ mysql_mutex_unlock(&di->mutex);
+ mysql_mutex_lock(&LOCK_delayed_insert);
+ di->unlink();
+ lock_count=di->lock_count();
+ mysql_mutex_unlock(&LOCK_delayed_insert);
+ mysql_mutex_lock(&di->mutex);
+ if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
+ break; // Time to die
+ }
+
+ /* Shouldn't wait if killed or an insert is waiting. */
+ if (!thd->killed && !di->status && !di->stacked_inserts)
+ {
+ struct timespec abstime;
+ set_timespec(abstime, delayed_insert_timeout);
- handle_delayed_insert_impl(thd, di);
+ /* Information for pthread_kill */
+ di->thd.mysys_var->current_mutex= &di->mutex;
+ di->thd.mysys_var->current_cond= &di->cond;
+ thd_proc_info(&(di->thd), "Waiting for INSERT");
-#ifndef __WIN__
-end:
+ DBUG_PRINT("info",("Waiting for someone to insert rows"));
+ while (!thd->killed && !di->status)
+ {
+ int error;
+ mysql_audit_release(thd);
+#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
+ error= mysql_cond_wait(&di->cond, &di->mutex);
+#else
+ error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
+#ifdef EXTRA_DEBUG
+ if (error && error != EINTR && error != ETIMEDOUT)
+ {
+ fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
+ DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
+ error));
+ }
#endif
- /*
- di should be unlinked from the thread handler list and have no active
- clients
- */
+#endif
+ if (error == ETIMEDOUT || error == ETIME)
+ thd->killed= KILL_CONNECTION;
+ }
+ /* We can't lock di->mutex and mysys_var->mutex at the same time */
+ mysql_mutex_unlock(&di->mutex);
+ mysql_mutex_lock(&di->thd.mysys_var->mutex);
+ di->thd.mysys_var->current_mutex= 0;
+ di->thd.mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&di->thd.mysys_var->mutex);
+ mysql_mutex_lock(&di->mutex);
+ }
+ thd_proc_info(&(di->thd), 0);
+
+ if (di->tables_in_use && ! thd->lock && !thd->killed)
+ {
+ /*
+ Request for new delayed insert.
+ Lock the table, but avoid to be blocked by a global read lock.
+ If we got here while a global read lock exists, then one or more
+ inserts started before the lock was requested. These are allowed
+ to complete their work before the server returns control to the
+ client which requested the global read lock. The delayed insert
+ handler will close the table and finish when the outstanding
+ inserts are done.
+ */
+ if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
+ {
+ /* Fatal error */
+ thd->killed= KILL_CONNECTION;
+ }
+ mysql_cond_broadcast(&di->cond_client);
+ }
+ if (di->stacked_inserts)
+ {
+ if (di->handle_inserts())
+ {
+ /* Some fatal error */
+ thd->killed= KILL_CONNECTION;
+ }
+ }
+ di->status=0;
+ if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
+ {
+ /*
+ No one is doing a insert delayed
+ Unlock table so that other threads can use it
+ */
+ MYSQL_LOCK *lock=thd->lock;
+ thd->lock=0;
+ mysql_mutex_unlock(&di->mutex);
+ /*
+ We need to release next_insert_id before unlocking. This is
+ enforced by handler::ha_external_lock().
+ */
+ di->table->file->ha_release_auto_increment();
+ mysql_unlock_tables(thd, lock);
+ trans_commit_stmt(thd);
+ di->group_count=0;
+ mysql_audit_release(thd);
+ mysql_mutex_lock(&di->mutex);
+ }
+ if (di->tables_in_use)
+ mysql_cond_broadcast(&di->cond_client); // If waiting clients
+ }
+
+ err:
+ DBUG_LEAVE;
+ }
di->table=0;
- di->dead= 1; // If error
- thd->killed= KILL_SYSTEM_THREAD; // If error
- pthread_mutex_unlock(&di->mutex);
+ thd->killed= KILL_CONNECTION; // If error
+ mysql_mutex_unlock(&di->mutex);
close_thread_tables(thd); // Free the table
- pthread_cond_broadcast(&di->cond_client); // Safety
+ thd->mdl_context.release_transactional_locks();
+ mysql_cond_broadcast(&di->cond_client); // Safety
- pthread_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
- pthread_mutex_lock(&LOCK_delayed_insert);
+ mysql_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
+ mysql_mutex_lock(&LOCK_delayed_insert);
+ /*
+ di should be unlinked from the thread handler list and have no active
+ clients
+ */
delete di;
- pthread_mutex_unlock(&LOCK_delayed_insert);
- pthread_mutex_unlock(&LOCK_delayed_create);
+ mysql_mutex_unlock(&LOCK_delayed_insert);
+ mysql_mutex_unlock(&LOCK_delayed_create);
my_thread_end();
pthread_exit(0);
@@ -2686,7 +2909,7 @@ static void free_delayed_insert_blobs(register TABLE *table)
{
uchar *str;
((Field_blob *) (*ptr))->get_ptr(&str);
- my_free(str,MYF(MY_ALLOW_ZERO_PTR));
+ my_free(str);
((Field_blob *) (*ptr))->reset();
}
}
@@ -2697,19 +2920,21 @@ bool Delayed_insert::handle_inserts(void)
{
int error;
ulong max_rows;
+ bool has_trans = TRUE;
bool using_ignore= 0, using_opt_replace= 0,
using_bin_log= mysql_bin_log.is_open();
delayed_row *row;
DBUG_ENTER("handle_inserts");
/* Allow client to insert new rows */
- pthread_mutex_unlock(&mutex);
+ mysql_mutex_unlock(&mutex);
table->next_number_field=table->found_next_number_field;
table->use_all_columns();
thd_proc_info(&thd, "upgrading lock");
- if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock))
+ if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
+ thd.variables.lock_wait_timeout))
{
/*
This can happen if thread is killed either by a shutdown
@@ -2723,7 +2948,7 @@ bool Delayed_insert::handle_inserts(void)
thd_proc_info(&thd, "insert");
max_rows= delayed_insert_limit;
- if (thd.killed || table->needs_reopen_or_name_lock())
+ if (thd.killed || table->s->has_old_version())
{
thd.killed= KILL_SYSTEM_THREAD;
max_rows= ULONG_MAX; // Do as much as possible
@@ -2736,12 +2961,12 @@ bool Delayed_insert::handle_inserts(void)
*/
if (!using_bin_log)
table->file->extra(HA_EXTRA_WRITE_CACHE);
- pthread_mutex_lock(&mutex);
+ mysql_mutex_lock(&mutex);
while ((row=rows.get()))
{
stacked_inserts--;
- pthread_mutex_unlock(&mutex);
+ mysql_mutex_unlock(&mutex);
memcpy(table->record[0],row->record,table->s->reclength);
thd.start_time=row->start_time;
@@ -2759,6 +2984,13 @@ bool Delayed_insert::handle_inserts(void)
if (log_query)
{
/*
+ Guaranteed that the INSERT DELAYED STMT will not be here
+ in SBR when mysql binlog is enabled.
+ */
+ DBUG_ASSERT(!(mysql_bin_log.is_open() &&
+ !thd.is_current_stmt_binlog_format_row()));
+
+ /*
This is the first value of an INSERT statement.
It is the right place to clear a forced insert_id.
This is usually done after the last value of an INSERT statement,
@@ -2825,44 +3057,17 @@ bool Delayed_insert::handle_inserts(void)
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
}
- if (log_query && mysql_bin_log.is_open())
- {
- bool backup_time_zone_used = thd.time_zone_used;
- Time_zone *backup_time_zone = thd.variables.time_zone;
- if (row->time_zone != NULL)
- {
- thd.time_zone_used = true;
- thd.variables.time_zone = row->time_zone;
- }
-
- /* if the delayed insert was killed, the killed status is
- ignored while binlogging */
- int errcode= 0;
- if (thd.killed == NOT_KILLED)
- errcode= query_error_code(&thd, TRUE);
-
- /*
- If the query has several rows to insert, only the first row will come
- here. In row-based binlogging, this means that the first row will be
- written to binlog as one Table_map event and one Rows event (due to an
- event flush done in binlog_query()), then all other rows of this query
- will be binlogged together as one single Table_map event and one
- single Rows event.
- */
- if (thd.binlog_query(THD::ROW_QUERY_TYPE,
- row->query.str, row->query.length,
- FALSE, FALSE, errcode))
- goto err;
-
- thd.time_zone_used = backup_time_zone_used;
- thd.variables.time_zone = backup_time_zone;
- }
-
if (table->s->blob_fields)
free_delayed_insert_blobs(table);
thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
- pthread_mutex_lock(&mutex);
+ mysql_mutex_lock(&mutex);
+
+ /*
+ Reset the table->auto_increment_field_not_null as it is valid for
+ only one row.
+ */
+ table->auto_increment_field_not_null= FALSE;
delete row;
/*
@@ -2878,19 +3083,20 @@ bool Delayed_insert::handle_inserts(void)
if (stacked_inserts || tables_in_use) // Let these wait a while
{
if (tables_in_use)
- pthread_cond_broadcast(&cond_client); // If waiting clients
+ mysql_cond_broadcast(&cond_client); // If waiting clients
thd_proc_info(&thd, "reschedule");
- pthread_mutex_unlock(&mutex);
+ mysql_mutex_unlock(&mutex);
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{
/* This should never happen */
table->file->print_error(error,MYF(0));
- sql_print_error("%s", thd.main_da.message());
+ sql_print_error("%s", thd.stmt_da->message());
DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
goto err;
}
query_cache_invalidate3(&thd, table, 1);
- if (thr_reschedule_write_lock(*thd.lock->locks))
+ if (thr_reschedule_write_lock(*thd.lock->locks,
+ thd.variables.lock_wait_timeout))
{
/* This is not known to happen. */
my_error(ER_DELAYED_CANT_CHANGE_LOCK,
@@ -2900,15 +3106,15 @@ bool Delayed_insert::handle_inserts(void)
}
if (!using_bin_log)
table->file->extra(HA_EXTRA_WRITE_CACHE);
- pthread_mutex_lock(&mutex);
+ mysql_mutex_lock(&mutex);
thd_proc_info(&thd, "insert");
}
if (tables_in_use)
- pthread_cond_broadcast(&cond_client); // If waiting clients
+ mysql_cond_broadcast(&cond_client); // If waiting clients
}
}
thd_proc_info(&thd, 0);
- pthread_mutex_unlock(&mutex);
+ mysql_mutex_unlock(&mutex);
/*
We need to flush the pending event when using row-based
@@ -2921,20 +3127,22 @@ bool Delayed_insert::handle_inserts(void)
or trigger.
TODO: Move the logging to last in the sequence of rows.
- */
- if (thd.current_stmt_binlog_row_based &&
- thd.binlog_flush_pending_rows_event(TRUE))
+ */
+ has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
+ table->file->has_transactions();
+ if (thd.is_current_stmt_binlog_format_row() &&
+ thd.binlog_flush_pending_rows_event(TRUE, has_trans))
goto err;
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{ // This shouldn't happen
table->file->print_error(error,MYF(0));
- sql_print_error("%s", thd.main_da.message());
+ sql_print_error("%s", thd.stmt_da->message());
DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
goto err;
}
query_cache_invalidate3(&thd, table, 1);
- pthread_mutex_lock(&mutex);
+ mysql_mutex_lock(&mutex);
DBUG_RETURN(0);
err:
@@ -2958,7 +3166,7 @@ bool Delayed_insert::handle_inserts(void)
}
DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
- pthread_mutex_lock(&mutex);
+ mysql_mutex_lock(&mutex);
DBUG_RETURN(1);
}
#endif /* EMBEDDED_LIBRARY */
@@ -2988,19 +3196,6 @@ bool mysql_insert_select_prepare(THD *thd)
/*
- Statement-based replication of INSERT ... SELECT ... LIMIT is not safe
- as order of rows is not defined, so in mixed mode we go to row-based.
-
- Note that we may consider a statement as safe if ORDER BY primary_key
- is present or we SELECT a constant. However it may confuse users to
- see very similiar statements replicated differently.
- */
- if (lex->current_select->select_limit)
- {
- lex->set_stmt_unsafe();
- thd->set_current_stmt_binlog_row_based_if_mixed();
- }
- /*
SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
clause if table is VIEW
*/
@@ -3089,6 +3284,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
res= (setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0) ||
check_insert_fields(thd, table_list, *fields, values,
!insert_into_view, 1, &map));
+
if (!res && fields->elements)
{
bool saved_abort_on_warning= thd->abort_on_warning;
@@ -3177,7 +3373,7 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
}
else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
- !thd->prelocked_mode)
+ thd->locked_tables_mode <= LTM_LOCK_TABLES)
{
/*
We must not yet prepare the result table if it is the same as one of the
@@ -3243,7 +3439,7 @@ int select_insert::prepare2(void)
{
DBUG_ENTER("select_insert::prepare2");
if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
- !thd->prelocked_mode)
+ thd->locked_tables_mode <= LTM_LOCK_TABLES)
table->file->ha_start_bulk_insert((ha_rows) 0);
DBUG_RETURN(0);
}
@@ -3280,6 +3476,8 @@ int select_insert::send_data(List<Item> &values)
unit->offset_limit_cnt--;
DBUG_RETURN(0);
}
+ if (thd->killed == ABORT_QUERY)
+ DBUG_RETURN(0);
thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
store_values(values);
@@ -3298,7 +3496,7 @@ int select_insert::send_data(List<Item> &values)
DBUG_RETURN(1);
}
}
-
+
// Release latches in case bulk insert takes a long time
ha_release_temporary_latches(thd);
@@ -3363,14 +3561,18 @@ bool select_insert::send_eof()
{
int error;
bool const trans_table= table->file->has_transactions();
- ulonglong id;
+ ulonglong id, row_count;
bool changed;
killed_state killed_status= thd->killed;
DBUG_ENTER("select_insert::send_eof");
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type()));
- error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert() : 0;
+ error= (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
+ table->file->ha_end_bulk_insert() : 0);
+ if (!error && thd->is_error())
+ error= thd->stmt_da->sql_errno();
+
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
@@ -3381,15 +3583,17 @@ bool select_insert::send_eof()
and ha_autocommit_or_rollback.
*/
query_cache_invalidate3(thd, table, 1);
- if (thd->transaction.stmt.modified_non_trans_table)
- thd->transaction.all.modified_non_trans_table= TRUE;
}
+
+ if (thd->transaction.stmt.modified_non_trans_table)
+ thd->transaction.all.modified_non_trans_table= TRUE;
+
DBUG_ASSERT(trans_table || !changed ||
thd->transaction.stmt.modified_non_trans_table);
/*
Write to binlog before commiting transaction. No statement will
- be written by the write_to_binlog() below in RBR mode. All the
+ be written by the binlog_query() below in RBR mode. All the
events are in the transaction cache and will be written when
ha_autocommit_or_rollback() is issued below.
*/
@@ -3401,8 +3605,9 @@ bool select_insert::send_eof()
thd->clear_error();
else
errcode= query_error_code(thd, killed_status == NOT_KILLED);
-
- if (write_to_binlog(trans_table, errcode))
+ if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+ thd->query(), thd->query_length(),
+ trans_table, FALSE, FALSE, errcode))
{
table->file->ha_release_auto_increment();
DBUG_RETURN(1);
@@ -3418,26 +3623,27 @@ bool select_insert::send_eof()
char buff[160];
if (info.ignore)
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
- (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
+ (ulong) (info.records - info.copied),
+ (ulong) thd->warning_info->statement_warn_count());
else
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
- (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
- thd->row_count_func= info.copied + info.deleted +
- ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
- info.touched : info.updated);
-
+ (ulong) (info.deleted+info.updated),
+ (ulong) thd->warning_info->statement_warn_count());
+ row_count= info.copied + info.deleted +
+ ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
+ info.touched : info.updated);
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
thd->first_successful_insert_id_in_cur_stmt :
(thd->arg_of_last_insert_id_function ?
thd->first_successful_insert_id_in_prev_stmt :
(info.copied ? autoinc_value_of_last_inserted_row : 0));
- ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
+ ::my_ok(thd, row_count, id, buff);
DBUG_RETURN(0);
}
-void select_insert::abort() {
+void select_insert::abort_result_set() {
- DBUG_ENTER("select_insert::abort");
+ DBUG_ENTER("select_insert::abort_result_set");
/*
If the creation of the table failed (due to a syntax error, for
example), no table will have been opened and therefore 'table'
@@ -3451,7 +3657,7 @@ void select_insert::abort() {
If we are not in prelocked mode, we end the bulk insert started
before.
*/
- if (!thd->prelocked_mode)
+ if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
table->file->ha_end_bulk_insert();
/*
@@ -3472,14 +3678,17 @@ void select_insert::abort() {
transactional_table= table->file->has_transactions();
if (thd->transaction.stmt.modified_non_trans_table)
{
+ if (!can_rollback_data())
+ thd->transaction.all.modified_non_trans_table= TRUE;
+
if (mysql_bin_log.is_open())
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* error of writing binary log is ignored */
- write_to_binlog(transactional_table, errcode);
+ (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
+ thd->query_length(),
+ transactional_table, FALSE, FALSE, errcode);
}
- if (!thd->current_stmt_binlog_row_based && !can_rollback_data())
- thd->transaction.all.modified_non_trans_table= TRUE;
if (changed)
query_cache_invalidate3(thd, table, 1);
}
@@ -3491,148 +3700,48 @@ void select_insert::abort() {
DBUG_VOID_RETURN;
}
-int select_insert::write_to_binlog(bool is_trans, int errcode)
-{
- /* It is only for statement mode */
- if (thd->current_stmt_binlog_row_based)
- return 0;
-
- return thd->binlog_query(THD::ROW_QUERY_TYPE,
- thd->query(), thd->query_length(),
- is_trans, FALSE, errcode);
-}
-
-/* Override the select_insert::write_to_binlog */
-int select_create::write_to_binlog(bool is_trans, int errcode)
-{
- /* It is only for statement mode */
- if (thd->current_stmt_binlog_row_based)
- return 0;
-
- /*
- WL#5370 Keep the compatibility between 5.1 master and 5.5 slave.
- Binlog a 'INSERT ... SELECT' statement only when it has the option
- 'IF NOT EXISTS' and the table already exists as a base table.
- */
- if ((create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS) &&
- create_info->table_existed)
- {
- String query;
- int result;
-
- thd->binlog_start_trans_and_stmt();
- /* Binlog the CREATE TABLE IF NOT EXISTS statement */
- result= binlog_show_create_table(&table, 1, 0);
- if (result)
- return result;
-
- uint db_len= strlen(create_table->db);
- uint table_len= strlen(create_info->alias);
- uint select_len= thd->query_length() - thd->lex->create_select_pos;
- uint field_len= (table->s->fields - (field - table->field)) *
- (MAX_FIELD_NAME + 3);
-
- /*
- pre-allocating memory reduces the times of reallocating memory,
- when calling query.appen().
- 40bytes is enough for other words("INSERT IGNORE INTO", etc.).
- */
- if (query.real_alloc(40 + db_len + table_len + field_len + select_len))
- return 1;
-
- if (thd->lex->create_select_in_comment)
- query.append(STRING_WITH_LEN("/*! "));
- if (thd->lex->ignore)
- query.append(STRING_WITH_LEN("INSERT IGNORE INTO `"));
- else if (thd->lex->duplicates == DUP_REPLACE)
- query.append(STRING_WITH_LEN("REPLACE INTO `"));
- else
- query.append(STRING_WITH_LEN("INSERT INTO `"));
-
- query.append(create_table->db, db_len);
- query.append(STRING_WITH_LEN("`.`"));
- query.append(create_info->alias, table_len);
- query.append(STRING_WITH_LEN("` "));
-
- /*
- The insert items.
- Field is the the rightmost columns that the rows are inster in.
- */
- query.append(STRING_WITH_LEN("("));
- for (Field **f= field ; *f ; f++)
- {
- if (f != field)
- query.append(STRING_WITH_LEN(","));
-
- query.append(STRING_WITH_LEN("`"));
- query.append((*f)->field_name, strlen((*f)->field_name));
- query.append(STRING_WITH_LEN("`"));
- }
- query.append(STRING_WITH_LEN(") "));
-
- /* The SELECT clause*/
- DBUG_ASSERT(thd->lex->create_select_pos);
- if (thd->lex->create_select_start_with_brace)
- query.append(STRING_WITH_LEN("("));
- if (query.append(thd->query() + thd->lex->create_select_pos, select_len))
- return 1;
-
- /*
- Avoid to use thd->binlog_query() twice, otherwise it will print the unsafe
- warning twice.
- */
- Query_log_event ev(thd, query.ptr(), query.length(), is_trans,
- FALSE, errcode);
- return mysql_bin_log.write(&ev);
- }
- else
- return select_insert::write_to_binlog(is_trans, errcode);
-}
/***************************************************************************
CREATE TABLE (SELECT) ...
***************************************************************************/
-/*
+/**
Create table from lists of fields and items (or just return TABLE
object for pre-opened existing table).
- SYNOPSIS
- create_table_from_items()
- thd in Thread object
- create_info in Create information (like MAX_ROWS, ENGINE or
- temporary table flag)
- create_table in Pointer to TABLE_LIST object providing database
- and name for table to be created or to be open
- alter_info in/out Initial list of columns and indexes for the table
- to be created
- items in List of items which should be used to produce rest
- of fields for the table (corresponding fields will
- be added to the end of alter_info->create_list)
- lock out Pointer to the MYSQL_LOCK object for table created
- (or open temporary table) will be returned in this
- parameter. Since this table is not included in
- THD::lock caller is responsible for explicitly
- unlocking this table.
- hooks
-
- NOTES
- This function behaves differently for base and temporary tables:
- - For base table we assume that either table exists and was pre-opened
- and locked at open_and_lock_tables() stage (and in this case we just
- emit error or warning and return pre-opened TABLE object) or special
- placeholder was put in table cache that guarantees that this table
- won't be created or opened until the placeholder will be removed
- (so there is an exclusive lock on this table).
- - We don't pre-open existing temporary table, instead we either open
- or create and then open table in this function.
-
+ @param thd [in] Thread object
+ @param create_info [in] Create information (like MAX_ROWS, ENGINE or
+ temporary table flag)
+ @param create_table [in] Pointer to TABLE_LIST object providing database
+ and name for table to be created or to be open
+ @param alter_info [in/out] Initial list of columns and indexes for the
+ table to be created
+ @param items [in] List of items which should be used to produce
+ rest of fields for the table (corresponding
+ fields will be added to the end of
+ alter_info->create_list)
+ @param lock [out] Pointer to the MYSQL_LOCK object for table
+ created will be returned in this parameter.
+ Since this table is not included in THD::lock
+ caller is responsible for explicitly unlocking
+ this table.
+ @param hooks [in] Hooks to be invoked before and after obtaining
+ table lock on the table being created.
+
+ @note
+ This function assumes that either table exists and was pre-opened and
+ locked at open_and_lock_tables() stage (and in this case we just emit
+ error or warning and return pre-opened TABLE object) or an exclusive
+ metadata lock was acquired on table so we can safely create, open and
+ lock table in it (we don't acquire metadata lock if this create is
+ for temporary table).
+
+ @note
Since this function contains some logic specific to CREATE TABLE ...
SELECT it should be changed before it can be used in other contexts.
- RETURN VALUES
- non-zero Pointer to TABLE object for table created or opened
- 0 Error
+ @retval non-zero Pointer to TABLE object for table created or opened
+ @retval 0 Error
*/
static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
@@ -3650,7 +3759,6 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
List_iterator_fast<Item> it(*items);
Item *item;
Field *tmp_field;
- bool not_used;
DBUG_ENTER("create_table_from_items");
tmp_table.alias= 0;
@@ -3688,7 +3796,7 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
alter_info->create_list.push_back(cr_field);
}
- DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););
+ DEBUG_SYNC(thd,"create_table_select_before_create");
/*
Create and lock table.
@@ -3707,30 +3815,21 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
open_table().
*/
{
- tmp_disable_binlog(thd);
if (!mysql_create_table_no_lock(thd, create_table->db,
create_table->table_name,
create_info, alter_info, 0,
- select_field_count))
+ select_field_count, NULL))
{
- if (create_info->table_existed &&
- !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
- {
- /*
- This means that someone created table underneath server
- or it was created via different mysqld front-end to the
- cluster. We don't have much options but throw an error.
- */
- my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
- DBUG_RETURN(0);
- }
-
- DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););
+ DEBUG_SYNC(thd,"create_table_select_before_open");
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
{
- VOID(pthread_mutex_lock(&LOCK_open));
- if (reopen_name_locked_table(thd, create_table, FALSE))
+ Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
+ /*
+ Here we open the destination table, on which we already have
+ an exclusive metadata lock.
+ */
+ if (open_table(thd, create_table, thd->mem_root, &ot_ctx))
{
quick_rm_table(create_info->db_type, create_table->db,
table_case_name(create_info, create_table->table_name),
@@ -3738,34 +3837,41 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
}
else
table= create_table->table;
- VOID(pthread_mutex_unlock(&LOCK_open));
}
else
{
- if (!(table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
- MYSQL_OPEN_TEMPORARY_ONLY)) &&
- !create_info->table_existed)
+ Open_table_context ot_ctx(thd, MYSQL_OPEN_TEMPORARY_ONLY);
+ if (open_table(thd, create_table, thd->mem_root, &ot_ctx))
{
/*
This shouldn't happen as creation of temporary table should make
it preparable for open. But let us do close_temporary_table() here
just in case.
*/
- drop_temporary_table(thd, create_table);
+ drop_temporary_table(thd, create_table, NULL);
}
+ else
+ table= create_table->table;
}
}
- reenable_binlog(thd);
if (!table) // open failed
+ {
+ if (!thd->is_error()) // CREATE ... IF NOT EXISTS
+ my_ok(thd); // succeed, but did nothing
DBUG_RETURN(0);
+ }
}
- DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););
+ DEBUG_SYNC(thd,"create_table_select_before_lock");
table->reginfo.lock_type=TL_WRITE;
hooks->prelock(&table, 1); // Call prelock hooks
- if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
- MYSQL_LOCK_IGNORE_FLUSH, &not_used)) ||
+ /*
+ mysql_lock_tables() below should never fail with request to reopen table
+ since it won't wait for the table lock (we have exclusive metadata lock on
+ the table) and thus can't get aborted.
+ */
+ if (! ((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
hooks->postlock(&table, 1))
{
/* purecov: begin tested */
@@ -3779,9 +3885,7 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
mysql_unlock_tables(thd, *lock);
*lock= 0;
}
-
- if (!create_info->table_existed)
- drop_open_table(thd, table, create_table->db, create_table->table_name);
+ drop_open_table(thd, table, create_table->db, create_table->table_name);
DBUG_RETURN(0);
/* purecov: end */
}
@@ -3816,34 +3920,42 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
*/
class MY_HOOKS : public TABLEOP_HOOKS {
public:
- MY_HOOKS(select_create *x, TABLE_LIST *create_table,
- TABLE_LIST *select_tables)
- : ptr(x), all_tables(*create_table)
+ MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
+ TABLE_LIST *select_tables_arg)
+ : ptr(x),
+ create_table(create_table_arg),
+ select_tables(select_tables_arg)
{
- all_tables.next_global= select_tables;
}
private:
virtual int do_postlock(TABLE **tables, uint count)
{
+ int error;
THD *thd= const_cast<THD*>(ptr->get_thd());
- if (int error= decide_logging_format(thd, &all_tables))
+ TABLE_LIST *save_next_global= create_table->next_global;
+
+ create_table->next_global= select_tables;
+
+ error= thd->decide_logging_format(create_table);
+
+ create_table->next_global= save_next_global;
+
+ if (error)
return error;
TABLE const *const table = *tables;
- if (thd->current_stmt_binlog_row_based &&
- !table->s->tmp_table &&
- !ptr->get_create_info()->table_existed)
+ if (thd->is_current_stmt_binlog_format_row() &&
+ !table->s->tmp_table)
{
- int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
- if (int error= ptr->binlog_show_create_table(tables, count, errcode))
+ if (int error= ptr->binlog_show_create_table(tables, count))
return error;
}
return 0;
}
-
select_create *ptr;
- TABLE_LIST all_tables;
+ TABLE_LIST *create_table;
+ TABLE_LIST *select_tables;
};
MY_HOOKS hooks(this, create_table, select_tables);
@@ -3857,44 +3969,21 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
temporary table, we need to start a statement transaction.
*/
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
- thd->current_stmt_binlog_row_based &&
+ thd->is_current_stmt_binlog_format_row() &&
mysql_bin_log.is_open())
{
thd->binlog_start_trans_and_stmt();
}
- DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););
+ DBUG_ASSERT(create_table->table == NULL);
- if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
- (create_table->table && create_table->table->db_stat))
- {
- /* Table already exists and was open at open_and_lock_tables() stage. */
- if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
- {
- /* Mark that table existed */
- create_info->table_existed= 1;
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
- ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
- create_table->table_name);
- if (thd->current_stmt_binlog_row_based)
- {
- int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
- binlog_show_create_table(&(create_table->table), 1, errcode);
- }
- table= create_table->table;
- }
- else
- {
- my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
- DBUG_RETURN(-1);
- }
- }
- else
- if (!(table= create_table_from_items(thd, create_info, create_table,
- alter_info, &values,
- &extra_lock, hook_ptr)))
- /* abort() deletes table */
- DBUG_RETURN(-1);
+ DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
+
+ if (!(table= create_table_from_items(thd, create_info, create_table,
+ alter_info, &values,
+ &extra_lock, hook_ptr)))
+ /* abort() deletes table */
+ DBUG_RETURN(-1);
if (extra_lock)
{
@@ -3934,7 +4023,7 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
if (info.handle_duplicates == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
- if (!thd->prelocked_mode)
+ if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
table->file->ha_start_bulk_insert((ha_rows) 0);
thd->abort_on_warning= (!info.ignore &&
(thd->variables.sql_mode &
@@ -3948,10 +4037,10 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
}
int
-select_create::binlog_show_create_table(TABLE **tables, uint count, int errcode)
+select_create::binlog_show_create_table(TABLE **tables, uint count)
{
/*
- Note 1: We generate a CREATE TABLE statement for the
+ Note 1: In RBR mode, we generate a CREATE TABLE statement for the
created table by calling store_create_info() (behaves as SHOW
CREATE TABLE). In the event of an error, nothing should be
written to the binary log, even if the table is non-transactional;
@@ -3967,6 +4056,7 @@ select_create::binlog_show_create_table(TABLE **tables, uint count, int errcode)
schema that will do a close_thread_tables(), destroying the
statement transaction cache.
*/
+ DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
DBUG_ASSERT(tables && *tables && count > 0);
char buf[2048];
@@ -3984,9 +4074,11 @@ select_create::binlog_show_create_table(TABLE **tables, uint count, int errcode)
if (mysql_bin_log.is_open())
{
+ int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
result= thd->binlog_query(THD::STMT_QUERY_TYPE,
query.ptr(), query.length(),
/* is_trans */ TRUE,
+ /* direct */ FALSE,
/* suppress_use */ FALSE,
errcode);
}
@@ -4006,15 +4098,11 @@ void select_create::send_error(uint errcode,const char *err)
DBUG_PRINT("info",
("Current statement %s row-based",
- thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
+ thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
DBUG_PRINT("info",
("Current table (at 0x%lu) %s a temporary (or non-existant) table",
(ulong) table,
table && !table->s->tmp_table ? "is NOT" : "is"));
- DBUG_PRINT("info",
- ("Table %s prior to executing this statement",
- get_create_info()->table_existed ? "existed" : "did not exist"));
-
/*
This will execute any rollbacks that are necessary before writing
the transcation cache.
@@ -4038,7 +4126,7 @@ bool select_create::send_eof()
{
bool tmp=select_insert::send_eof();
if (tmp)
- abort();
+ abort_result_set();
else
{
/*
@@ -4048,8 +4136,8 @@ bool select_create::send_eof()
*/
if (!table->s->tmp_table)
{
- ha_autocommit_or_rollback(thd, 0);
- end_active_trans(thd);
+ trans_commit_stmt(thd);
+ trans_commit_implicit(thd);
}
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
@@ -4065,12 +4153,12 @@ bool select_create::send_eof()
}
-void select_create::abort()
+void select_create::abort_result_set()
{
- DBUG_ENTER("select_create::abort");
+ DBUG_ENTER("select_create::abort_result_set");
/*
- In select_insert::abort() we roll back the statement, including
+ In select_insert::abort_result_set() we roll back the statement, including
truncating the transaction cache of the binary log. To do this, we
pretend that the statement is transactional, even though it might
be the case that it was not.
@@ -4085,11 +4173,11 @@ void select_create::abort()
log state.
*/
tmp_disable_binlog(thd);
- select_insert::abort();
+ select_insert::abort_result_set();
thd->transaction.stmt.modified_non_trans_table= FALSE;
reenable_binlog(thd);
/* possible error of writing binary log is ignored deliberately */
- (void)thd->binlog_flush_pending_rows_event(TRUE);
+ (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
if (m_plock)
{
@@ -4100,21 +4188,10 @@ void select_create::abort()
if (table)
{
- if (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
- thd->current_stmt_binlog_row_based &&
- !(thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) &&
- mysql_bin_log.is_open())
- {
- /*
- This should be removed after BUG#47899.
- */
- mysql_bin_log.reset_gathered_updates(thd);
- }
-
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
- if (!create_info->table_existed)
- drop_open_table(thd, table, create_table->db, create_table->table_name);
+ table->auto_increment_field_not_null= FALSE;
+ drop_open_table(thd, table, create_table->db, create_table->table_name);
table=0; // Safety
}
DBUG_VOID_RETURN;