summaryrefslogtreecommitdiff
path: root/sql/sql_insert.cc
diff options
context:
space:
mode:
authorlars@mysql.com <>2005-12-22 06:39:02 +0100
committerlars@mysql.com <>2005-12-22 06:39:02 +0100
commitad126d90e019f223470e73e1b2b528f9007c4532 (patch)
tree192a6c65973c50f6a436a8c6e6bb19efa2ed2419 /sql/sql_insert.cc
parent0f8f444b8be51db1265a3c30a576c9e404b86071 (diff)
downloadmariadb-git-ad126d90e019f223470e73e1b2b528f9007c4532.tar.gz
WL#1012: All changes as one single changeset.
This includes both code and test cases.
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r--sql/sql_insert.cc228
1 files changed, 188 insertions, 40 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 72a2f4a4f91..e053f06df55 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -21,6 +21,7 @@
#include "sp_head.h"
#include "sql_trigger.h"
#include "sql_select.h"
+#include "sql_show.h"
static int check_null_fields(THD *thd,TABLE *entry);
#ifndef EMBEDDED_LIBRARY
@@ -576,10 +577,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
{
if (error <= 0)
thd->clear_error();
- Query_log_event qinfo(thd, thd->query, thd->query_length,
- transactional_table, FALSE);
- if (mysql_bin_log.write(&qinfo) && transactional_table)
- error=1;
+ if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+ thd->query, thd->query_length,
+ transactional_table, FALSE) &&
+ transactional_table)
+ {
+ error=1;
+ }
}
if (!transactional_table)
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
@@ -945,10 +949,11 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
DBUG_ENTER("write_record");
info->records++;
+
if (info->handle_duplicates == DUP_REPLACE ||
info->handle_duplicates == DUP_UPDATE)
{
- while ((error=table->file->write_row(table->record[0])))
+ while ((error=table->file->ha_write_row(table->record[0])))
{
uint key_nr;
if (error != HA_WRITE_SKIP)
@@ -1032,7 +1037,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
thd->clear_next_insert_id= 0;
thd->next_insert_id= 0;
}
- if ((error=table->file->update_row(table->record[1],table->record[0])))
+ if ((error=table->file->ha_update_row(table->record[1],table->record[0])))
{
if ((error == HA_ERR_FOUND_DUPP_KEY) && info->ignore)
goto ok_or_after_trg_err;
@@ -1071,8 +1076,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
thd->clear_next_insert_id= 0;
thd->next_insert_id= 0;
}
- if ((error=table->file->update_row(table->record[1],
- table->record[0])))
+ if ((error=table->file->ha_update_row(table->record[1],
+ table->record[0])))
goto err;
info->deleted++;
trg_error= (table->triggers &&
@@ -1089,7 +1094,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
TRG_ACTION_BEFORE, TRUE))
goto before_trg_err;
- if ((error=table->file->delete_row(table->record[1])))
+ if ((error=table->file->ha_delete_row(table->record[1])))
goto err;
info->deleted++;
if (!table->file->has_transactions())
@@ -1110,7 +1115,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info)
table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
TRG_ACTION_AFTER, TRUE));
}
- else if ((error=table->file->write_row(table->record[0])))
+ else if ((error=table->file->ha_write_row(table->record[0])))
{
if (!info->ignore ||
(error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE))
@@ -1196,16 +1201,15 @@ int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
class delayed_row :public ilink {
public:
- char *record,*query;
+ char *record;
enum_duplicates dup;
time_t start_time;
bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
ulonglong last_insert_id;
timestamp_auto_set_type timestamp_field_type;
- uint query_length;
delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
- :record(0), query(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
+ :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
~delayed_row()
{
x_free(record);
@@ -1215,6 +1219,9 @@ public:
class delayed_insert :public ilink {
uint locks_in_memory;
+ char *query;
+ ulong query_length;
+ ulong query_allocated;
public:
THD thd;
TABLE *table;
@@ -1228,7 +1235,7 @@ public:
TABLE_LIST table_list; // Argument
delayed_insert()
- :locks_in_memory(0),
+ :locks_in_memory(0), query(0), query_length(0), query_allocated(0),
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
group_count(0)
{
@@ -1254,6 +1261,7 @@ public:
}
~delayed_insert()
{
+ my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
/* The following is not really needed, but just for safety */
delayed_row *row;
while ((row=rows.get()))
@@ -1273,6 +1281,25 @@ public:
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
}
+ int set_query(char const *q, ulong qlen) {
+ if (q && qlen > 0)
+ {
+ if (query_allocated < qlen + 1)
+ {
+ ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
+ query= my_realloc(query, qlen + 1, MYF(flags));
+ if (query == 0)
+ return HA_ERR_OUT_OF_MEM;
+ query_allocated= qlen;
+ }
+ query_length= qlen;
+ memcpy(query, q, qlen + 1);
+ }
+ else
+ query_length= 0;
+ return 0;
+ }
+
/* The following is for checking when we can delete ourselves */
inline void lock()
{
@@ -1562,18 +1589,22 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
goto err;
+#if 0
if (!query)
query_length=0;
- if (!(row->record= (char*) my_malloc(table->s->reclength+query_length+1,
- MYF(MY_WME))))
+#endif
+ if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
goto err;
memcpy(row->record, table->record[0], table->s->reclength);
+ di->set_query(query, query_length);
+#if 0
if (query_length)
{
row->query= row->record+table->s->reclength;
memcpy(row->query,query,query_length+1);
}
row->query_length= query_length;
+#endif
row->start_time= thd->start_time;
row->query_start_used= thd->query_start_used;
row->last_insert_id_used= thd->last_insert_id_used;
@@ -1897,7 +1928,21 @@ bool delayed_insert::handle_inserts(void)
{
int error;
ulong max_rows;
- bool using_ignore=0, using_bin_log=mysql_bin_log.is_open();
+ bool using_ignore=0,
+ using_bin_log= mysql_bin_log.is_open();
+
+#if 0
+ /*
+ The actual text for the query is added to the first row in the
+ list. Since the row is destroyed, with all it's memory, we need
+ to take a copy of it to be able to log it after all rows have been
+ applied.
+ */
+ uint const query_length= rows.head()->query_length;
+ char *const query= static_cast<char*>(my_alloca(query_length+1));
+ memcpy(query, rows.head()->query, query_length);
+#endif
+
delayed_row *row;
DBUG_ENTER("handle_inserts");
@@ -1963,11 +2008,6 @@ bool delayed_insert::handle_inserts(void)
using_ignore=0;
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
}
- if (row->query && row->log_query && using_bin_log)
- {
- Query_log_event qinfo(&thd, row->query, row->query_length, 0, FALSE);
- mysql_bin_log.write(&qinfo);
- }
if (table->s->blob_fields)
free_delayed_insert_blobs(table);
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
@@ -1982,8 +2022,7 @@ bool delayed_insert::handle_inserts(void)
on this table until all entries has been processed
*/
if (group_count++ >= max_rows && (row= rows.head()) &&
- (!(row->log_query & using_bin_log) ||
- row->query))
+ (!(row->log_query & using_bin_log)))
{
group_count=0;
if (stacked_inserts || tables_in_use) // Let these wait a while
@@ -2019,6 +2058,10 @@ bool delayed_insert::handle_inserts(void)
thd.proc_info=0;
table->next_number_field=0;
pthread_mutex_unlock(&mutex);
+
+ /* After releasing the mutex, to prevent deadlocks. */
+ thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
+
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{ // This shouldn't happen
table->file->print_error(error,MYF(0));
@@ -2216,6 +2259,16 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
check_that_all_fields_are_given_values(thd, table, table_list)) ||
table_list->prepare_where(thd, 0, TRUE) ||
table_list->prepare_check_option(thd));
+
+ /*
+ For non-transactional non-temporary tables, we set the
+ OPTION_STATUS_NO_TRANS_UPDATE flag here. The send_eof() function
+ is used by both the select_insert and the select_create classes,
+ so setting it there would clash.
+ */
+ if (!(table->file->has_transactions() || table->s->tmp_table))
+ thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
+
DBUG_RETURN(res);
}
@@ -2345,9 +2398,31 @@ void select_insert::send_error(uint errcode,const char *err)
table->file->end_bulk_insert();
/*
If at least one row has been inserted/modified and will stay in the table
- (the table doesn't have transactions) (example: we got a duplicate key
- error while inserting into a MyISAM table) we must write to the binlog (and
+ (the table doesn't have transactions) we must write to the binlog (and
the error code will make the slave stop).
+
+ For many errors (example: we got a duplicate key error while
+ inserting into a MyISAM table), no row will be added to the table,
+ so passing the error to the slave will not help since there will
+ be an error code mismatch (the inserts will succeed on the slave
+ with no error).
+
+ If we are using row-based replication we have two cases where this
+ code is executed: replication of CREATE-SELECT and replication of
+ INSERT-SELECT.
+
+ When replicating a CREATE-SELECT statement, we shall not write the
+ events to the binary log. To prevent the ha_rollback_stmt() below
+ from writing to the binary log, we have to pretend that the table
+ is transactional, even if it actually is not. Therefore, the
+ OPTION_STATUS_NO_TRANS_UPDATE is cleared in
+ select_create::prepare() and will remain cleared here.
+
+ When replicating INSERT-SELECT, we shall not write the events to
+ the binary log for transactional table, but shall write all events
+ if there is one or more writes to non-transactional tables. In
+ this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
+ write to a non-transactional table, otherwise it is cleared.
*/
if ((info.copied || info.deleted || info.updated) &&
!table->file->has_transactions())
@@ -2356,11 +2431,10 @@ void select_insert::send_error(uint errcode,const char *err)
thd->insert_id(last_insert_id); // For binary log
if (mysql_bin_log.is_open())
{
- Query_log_event qinfo(thd, thd->query, thd->query_length,
- table->file->has_transactions(), FALSE);
- mysql_bin_log.write(&qinfo);
+ thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
+ table->file->has_transactions(), FALSE);
}
- if (!table->s->tmp_table)
+ if (!binlog_row_based && !table->s->tmp_table)
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
}
if (info.copied || info.deleted || info.updated)
@@ -2382,26 +2456,36 @@ bool select_insert::send_eof()
/*
We must invalidate the table in the query cache before binlog writing
- and ha_autocommit_or_rollback
- */
+ and ha_autocommit_or_rollback.
+
+ If nothing was inserted in the table, there is no need to emit a
+ ROLLBACK statement to the binary log, so in that case we clear
+ OPTION_STATUS_NO_TRANS_UPDATE.
+ Observe that select_insert::send_eof() is used by both
+ select_insert and select_create and that they set the flag in
+ different manners. See Note 1 below for more info.
+ */
if (info.copied || info.deleted || info.updated)
- {
query_cache_invalidate3(thd, table, 1);
- if (!(table->file->has_transactions() || table->s->tmp_table))
- thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
- }
+ else
+ thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE;
if (last_insert_id)
thd->insert_id(last_insert_id); // For binary log
- /* Write to binlog before commiting transaction */
+ /*
+ Write to binlog before commiting transaction. No statement will
+ be written by the binlog_query() below in RBR mode. All the
+ events are in the transaction cache and will be written when
+ ha_autocommit_or_rollback() is issued below.
+ */
if (mysql_bin_log.is_open())
{
if (!error)
thd->clear_error();
- Query_log_event qinfo(thd, thd->query, thd->query_length,
- table->file->has_transactions(), FALSE);
- mysql_bin_log.write(&qinfo);
+ thd->binlog_query(THD::ROW_QUERY_TYPE,
+ thd->query, thd->query_length,
+ table->file->has_transactions(), FALSE);
}
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
error=error2;
@@ -2467,8 +2551,62 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
}
+void
+select_create::binlog_show_create_table()
+{
+ /*
+ Note 1: In RBR mode, we generate a CREATE TABLE statement for the
+ created table by calling store_create_info() (behaves as SHOW
+ CREATE TABLE). In the event of an error, nothing should be
+ written to the binary log, even if the table is non-transactional;
+ therefore we pretend that the generated CREATE TABLE statement is
+ for a transactional table. The event will then be put in the
+ transaction cache, and any subsequent events (e.g., table-map
+ events and binrow events) will also be put there. We can then use
+ ha_autocommit_or_rollback() to either throw away the entire
+ kaboodle of events, or write them to the binary log.
+
+ We write the CREATE TABLE statement here and not in prepare()
+ since there potentially are sub-selects or accesses to information
+ schema that will do a close_thread_tables(), destroying the
+ statement transaction cache.
+
+ To ensure that the event kaboodle is not written to the binary log
+ on rollback, we clear the OPTION_STATUS_NO_TRANS_UPDATE bit of
+ thd->options.
+ */
+ DBUG_ASSERT(binlog_row_based && !create_table_written);
+
+ thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE;
+ char buf[2048];
+ String query(buf, sizeof(buf), system_charset_info);
+ query.length(0); // Have to zero it since constructor doesn't
+
+ TABLE_LIST tables;
+ memset(&tables, 0, sizeof(tables));
+ tables.table = table;
+
+ int result= store_create_info(thd, &tables, &query, create_info);
+ DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
+ thd->binlog_query(THD::STMT_QUERY_TYPE,
+ query.ptr(), query.length(),
+ /* is_trans */ TRUE,
+ /* suppress_use */ FALSE);
+}
+
+
void select_create::store_values(List<Item> &values)
{
+ /*
+ Before writing the first row, we write the CREATE TABLE statement
+ to the binlog.
+ */
+ if (binlog_row_based && !create_table_written)
+ {
+ binlog_show_create_table();
+ create_table_written= TRUE;
+ }
+
fill_record_n_invoke_before_triggers(thd, field, values, 1,
table->triggers, TRG_EVENT_INSERT);
}
@@ -2488,6 +2626,16 @@ void select_create::send_error(uint errcode,const char *err)
bool select_create::send_eof()
{
+ /*
+ If no rows where written to the binary log, we write the CREATE
+ TABLE statement to the binlog.
+ */
+ if (binlog_row_based && !create_table_written)
+ {
+ binlog_show_create_table();
+ create_table_written= TRUE;
+ }
+
bool tmp=select_insert::send_eof();
if (tmp)
abort();