summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/mysql.h.pp16
-rw-r--r--include/mysql_com.h26
-rw-r--r--sql/field.cc40
-rw-r--r--sql/field.h3
-rw-r--r--sql/item.cc73
-rw-r--r--sql/item.h14
-rw-r--r--sql/protocol.cc1
-rw-r--r--sql/share/errmsg-utf8.txt5
-rw-r--r--sql/sql_base.cc26
-rw-r--r--sql/sql_class.cc17
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_error.cc26
-rw-r--r--sql/sql_error.h29
-rw-r--r--sql/sql_insert.cc178
-rw-r--r--sql/sql_parse.cc10
-rw-r--r--sql/sql_prepare.cc288
-rw-r--r--sql/sql_prepare.h3
-rw-r--r--sql/wsrep_thd.cc3
-rw-r--r--storage/perfschema/pfs.cc2
19 files changed, 610 insertions, 167 deletions
diff --git a/include/mysql.h.pp b/include/mysql.h.pp
index 1ce4c057336..aa9a0aaa266 100644
--- a/include/mysql.h.pp
+++ b/include/mysql.h.pp
@@ -11,11 +11,17 @@ enum enum_server_command
COM_STMT_RESET, COM_SET_OPTION, COM_STMT_FETCH, COM_DAEMON,
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=250,
- COM_SLAVE_WORKER,
- COM_SLAVE_IO,
- COM_SLAVE_SQL,
- COM_MULTI,
- COM_END
+ COM_SLAVE_WORKER=251,
+ COM_SLAVE_IO=252,
+ COM_SLAVE_SQL=253,
+ COM_MULTI=254,
+ COM_END=255
+};
+enum enum_indicator_type
+{
+ STMT_INDICATOR_NONE= 0,
+ STMT_INDICATOR_NULL,
+ STMT_INDICATOR_DEFAULT
};
struct st_vio;
typedef struct st_vio Vio;
diff --git a/include/mysql_com.h b/include/mysql_com.h
index 57e311538d8..36d6ad526f8 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -114,12 +114,23 @@ enum enum_server_command
/* don't forget to update const char *command_name[] in sql_parse.cc */
COM_MDB_GAP_BEG,
COM_MDB_GAP_END=250,
- COM_SLAVE_WORKER,
- COM_SLAVE_IO,
- COM_SLAVE_SQL,
- COM_MULTI,
+ COM_SLAVE_WORKER=251,
+ COM_SLAVE_IO=252,
+ COM_SLAVE_SQL=253,
+ COM_MULTI=254,
/* Must be last */
- COM_END
+ COM_END=255
+};
+
+
+/*
+ Bulk PS protocol indicator value:
+*/
+enum enum_indicator_type
+{
+ STMT_INDICATOR_NONE= 0,
+ STMT_INDICATOR_NULL,
+ STMT_INDICATOR_DEFAULT
};
/* sql type stored in .frm files for virtual fields */
@@ -256,6 +267,8 @@ enum enum_server_command
#define MARIADB_CLIENT_PROGRESS (1ULL << 32)
/* support COM_MULTI */
#define MARIADB_CLIENT_COM_MULTI (1ULL << 33)
+/* support of array binding */
+#define MARIADB_CLIENT_STMT_BULK_OPERATIONS (1UL << 34)
#ifdef HAVE_COMPRESS
#define CAN_CLIENT_COMPRESS CLIENT_COMPRESS
@@ -295,7 +308,8 @@ enum enum_server_command
CLIENT_SESSION_TRACK |\
CLIENT_DEPRECATE_EOF |\
CLIENT_CONNECT_ATTRS |\
- MARIADB_CLIENT_COM_MULTI)
+ MARIADB_CLIENT_COM_MULTI |\
+ MARIADB_CLIENT_STMT_BULK_OPERATIONS)
/*
To be added later:
diff --git a/sql/field.cc b/sql/field.cc
index e4ecb7422f2..609d44f4856 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -10821,3 +10821,43 @@ bool Field::validate_value_in_record_with_warn(THD *thd, const uchar *record)
dbug_tmp_restore_column_map(table->read_set, old_map);
return rc;
}
+
+
+bool Field::save_in_field_default_value(bool view_error_processing)
+{
+ THD *thd= table->in_use;
+
+ if (flags & NO_DEFAULT_VALUE_FLAG &&
+ real_type() != MYSQL_TYPE_ENUM)
+ {
+ if (reset())
+ {
+ my_message(ER_CANT_CREATE_GEOMETRY_OBJECT,
+ ER_THD(thd, ER_CANT_CREATE_GEOMETRY_OBJECT), MYF(0));
+ return -1;
+ }
+
+ if (view_error_processing)
+ {
+ TABLE_LIST *view= table->pos_in_table_list->top_table();
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_NO_DEFAULT_FOR_VIEW_FIELD,
+ ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
+ view->view_db.str,
+ view->view_name.str);
+ }
+ else
+ {
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_NO_DEFAULT_FOR_FIELD,
+ ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
+ field_name);
+ }
+ return 1;
+ }
+ set_default();
+ return
+ !is_null() &&
+ validate_value_in_record_with_warn(thd, table->record[0]) &&
+ thd->is_error() ? -1 : 0;
+}
diff --git a/sql/field.h b/sql/field.h
index f550dad1c6c..c9d7509618a 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -1451,6 +1451,9 @@ public:
// Exactly the same rules with REF access
return can_optimize_keypart_ref(cond, item);
}
+
+ bool save_in_field_default_value(bool view_eror_processing);
+
friend int cre_myisam(char * name, register TABLE *form, uint options,
ulonglong auto_increment_value);
friend class Copy_field;
diff --git a/sql/item.cc b/sql/item.cc
index 2388679e424..70b7383d619 100644
--- a/sql/item.cc
+++ b/sql/item.cc
@@ -3236,6 +3236,7 @@ Item_param::Item_param(THD *thd, uint pos_in_query_arg):
Rewritable_query_parameter(pos_in_query_arg, 1),
Type_handler_hybrid_field_type(MYSQL_TYPE_VARCHAR),
state(NO_VALUE),
+ indicators(0), indicator(STMT_INDICATOR_NONE),
/* Don't pretend to be a literal unless value for this item is set. */
item_type(PARAM_ITEM),
set_param_func(default_set_param_func),
@@ -3600,6 +3601,10 @@ int Item_param::save_in_field(Field *field, bool no_conversions)
str_value.charset());
case NULL_VALUE:
return set_field_to_null_with_conversions(field, no_conversions);
+ case DEFAULT_VALUE:
+ return field->save_in_field_default_value(field->table->pos_in_table_list->
+ top_table() !=
+ field->table->pos_in_table_list);
case NO_VALUE:
default:
DBUG_ASSERT(0);
@@ -3645,6 +3650,9 @@ double Item_param::val_real()
return TIME_to_double(&value.time);
case NULL_VALUE:
return 0.0;
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
default:
DBUG_ASSERT(0);
}
@@ -3672,6 +3680,9 @@ longlong Item_param::val_int()
}
case TIME_VALUE:
return (longlong) TIME_to_ulonglong(&value.time);
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return 0;
default:
@@ -3699,6 +3710,9 @@ my_decimal *Item_param::val_decimal(my_decimal *dec)
{
return TIME_to_my_decimal(&value.time, dec);
}
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return 0;
default:
@@ -3734,6 +3748,9 @@ String *Item_param::val_str(String* str)
str->set_charset(&my_charset_bin);
return str;
}
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return NULL;
default:
@@ -3812,6 +3829,9 @@ const String *Item_param::query_val_str(THD *thd, String* str) const
thd->variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES);
break;
}
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return &my_null_string;
default:
@@ -3862,6 +3882,9 @@ Item_param::clone_item(THD *thd)
{
MEM_ROOT *mem_root= thd->mem_root;
switch (state) {
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return new (mem_root) Item_null(thd, name);
case INT_VALUE:
@@ -3894,6 +3917,9 @@ Item_param::eq(const Item *item, bool binary_cmp) const
return FALSE;
switch (state) {
+ case DEFAULT_VALUE:
+ my_message(ER_INVALID_DEFAULT_PARAM,
+ ER_THD(current_thd, ER_INVALID_DEFAULT_PARAM), MYF(0));
case NULL_VALUE:
return null_eq(item);
case INT_VALUE:
@@ -3917,6 +3943,10 @@ void Item_param::print(String *str, enum_query_type query_type)
{
str->append('?');
}
+ else if (state == DEFAULT_VALUE)
+ {
+ str->append("default");
+ }
else
{
char buffer[STRING_BUFFER_USUAL_SIZE];
@@ -3968,6 +3998,11 @@ Item_param::set_param_type_and_swap_value(Item_param *src)
}
+void Item_param::set_default()
+{
+ state= DEFAULT_VALUE;
+}
+
/**
This operation is intended to store some item value in Item_param to be
used later.
@@ -8579,42 +8614,8 @@ int Item_default_value::save_in_field(Field *field_arg, bool no_conversions)
calculate();
else
{
- TABLE *table= field_arg->table;
- THD *thd= table->in_use;
-
- if (field_arg->flags & NO_DEFAULT_VALUE_FLAG &&
- field_arg->real_type() != MYSQL_TYPE_ENUM)
- {
- if (field_arg->reset())
- {
- my_message(ER_CANT_CREATE_GEOMETRY_OBJECT,
- ER_THD(thd, ER_CANT_CREATE_GEOMETRY_OBJECT), MYF(0));
- return -1;
- }
-
- if (context->error_processor == &view_error_processor)
- {
- TABLE_LIST *view= table->pos_in_table_list->top_table();
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_NO_DEFAULT_FOR_VIEW_FIELD,
- ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
- view->view_db.str,
- view->view_name.str);
- }
- else
- {
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_NO_DEFAULT_FOR_FIELD,
- ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
- field_arg->field_name);
- }
- return 1;
- }
- field_arg->set_default();
- return
- !field_arg->is_null() &&
- field_arg->validate_value_in_record_with_warn(thd, table->record[0]) &&
- thd->is_error() ? -1 : 0;
+ return field_arg->save_in_field_default_value(context->error_processor ==
+ &view_error_processor);
}
return Item_field::save_in_field(field_arg, no_conversions);
}
diff --git a/sql/item.h b/sql/item.h
index ab70fdb7dc1..d0d845711f7 100644
--- a/sql/item.h
+++ b/sql/item.h
@@ -2798,7 +2798,7 @@ public:
{
NO_VALUE, NULL_VALUE, INT_VALUE, REAL_VALUE,
STRING_VALUE, TIME_VALUE, LONG_DATA_VALUE,
- DECIMAL_VALUE
+ DECIMAL_VALUE, DEFAULT_VALUE
} state;
struct CONVERSION_INFO
@@ -2843,6 +2843,13 @@ public:
};
/*
+ Used for bulk protocol. Indicates if we should expect
+ indicators byte before value of the parameter
+ */
+ my_bool indicators;
+ enum enum_indicator_type indicator;
+
+ /*
A buffer for string and long data values. Historically all allocated
values returned from val_str() were treated as eligible to
modification. I. e. in some cases Item_func_concat can append it's
@@ -2882,6 +2889,7 @@ public:
bool get_date(MYSQL_TIME *tm, ulonglong fuzzydate);
int save_in_field(Field *field, bool no_conversions);
+ void set_default();
void set_null();
void set_int(longlong i, uint32 max_length_arg);
void set_double(double i);
@@ -5102,6 +5110,10 @@ public:
:Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
(const char *)NULL),
arg(a) {}
+ Item_default_value(THD *thd, Name_resolution_context *context_arg, Field *a)
+ :Item_field(thd, context_arg, (const char *)NULL, (const char *)NULL,
+ (const char *)NULL),
+ arg(NULL) {}
enum Type type() const { return DEFAULT_VALUE_ITEM; }
bool eq(const Item *item, bool binary_cmp) const;
bool fix_fields(THD *, Item **);
diff --git a/sql/protocol.cc b/sql/protocol.cc
index a3085c69f17..f8b68c02fff 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -572,6 +572,7 @@ void Protocol::end_statement()
thd->get_stmt_da()->statement_warn_count());
break;
case Diagnostics_area::DA_OK:
+ case Diagnostics_area::DA_OK_BULK:
error= send_ok(thd->server_status,
thd->get_stmt_da()->statement_warn_count(),
thd->get_stmt_da()->affected_rows(),
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index d42611bfebe..ade788623e3 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -7232,3 +7232,8 @@ ER_PARTITION_DEFAULT_ERROR
ukr "Припустимо мати тільки один DEFAULT розділ"
ER_REFERENCED_TRG_DOES_NOT_EXIST
eng "Referenced trigger '%s' for the given action time and event type does not exist"
+ER_INVALID_DEFAULT_PARAM
+ eng "Default value is not supported for such parameter usage"
+ ukr "Значення за замовчуванням не підтримано для цього випадку використання параьетра"
+ER_BINLOG_NON_SUPPORTED_BULK
+ eng "Only row based replication supported for bulk operations"
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index d3832a7068e..2d40631d79f 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -7810,9 +7810,10 @@ fill_record(THD *thd, TABLE *table_arg, List<Item> &fields, List<Item> &values,
if (table->next_number_field &&
rfield->field_index == table->next_number_field->field_index)
table->auto_increment_field_not_null= TRUE;
- if (rfield->vcol_info &&
- value->type() != Item::DEFAULT_VALUE_ITEM &&
- value->type() != Item::NULL_ITEM &&
+ Item::Type type= value->type();
+ if (rfield->vcol_info &&
+ type != Item::DEFAULT_VALUE_ITEM &&
+ type != Item::NULL_ITEM &&
table->s->table_category != TABLE_CATEGORY_TEMPORARY)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
@@ -8060,15 +8061,18 @@ fill_record(THD *thd, TABLE *table, Field **ptr, List<Item> &values,
value=v++;
if (field->field_index == autoinc_index)
table->auto_increment_field_not_null= TRUE;
- if (field->vcol_info &&
- value->type() != Item::DEFAULT_VALUE_ITEM &&
- value->type() != Item::NULL_ITEM &&
- table->s->table_category != TABLE_CATEGORY_TEMPORARY)
+ if (field->vcol_info)
{
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN,
- ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
- field->field_name, table->s->table_name.str);
+ Item::Type type= value->type();
+ if (type != Item::DEFAULT_VALUE_ITEM &&
+ type != Item::NULL_ITEM &&
+ table->s->table_category != TABLE_CATEGORY_TEMPORARY)
+ {
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN,
+ ER_THD(thd, ER_WARNING_NON_DEFAULT_VALUE_FOR_VIRTUAL_COLUMN),
+ field->field_name, table->s->table_name.str);
+ }
}
if (use_value)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 1af3b9a9cca..b896f4567af 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -854,6 +854,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
+ bulk_param(0),
table_map_for_update(0),
m_examined_row_count(0),
accessed_rows_and_keys(0),
@@ -5701,6 +5702,17 @@ int THD::decide_logging_format(TABLE_LIST *tables)
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
+
+ if (is_bulk_op())
+ {
+ if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
+ {
+ my_error(ER_BINLOG_NON_SUPPORTED_BULK, MYF(0));
+ DBUG_PRINT("info",
+ ("decision: no logging since an error was generated"));
+ DBUG_RETURN(-1);
+ }
+ }
/*
Compute one bit field with the union of all the engine
capabilities, and one with the intersection of all the engine
@@ -5959,7 +5971,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
- else if (wsrep_binlog_format() == BINLOG_FORMAT_ROW &&
+ else if ((wsrep_binlog_format() == BINLOG_FORMAT_ROW || is_bulk_op()) &&
sqlcom_can_generate_row_events(this))
{
/*
@@ -6032,7 +6044,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
else
{
if (lex->is_stmt_unsafe() || lex->is_stmt_row_injection()
- || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0)
+ || (flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 ||
+ is_bulk_op())
{
/* log in row format! */
set_current_stmt_binlog_format_row_if_mixed();
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 994a161a646..93d3a4a0cd4 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2463,6 +2463,8 @@ public:
*/
Query_arena *stmt_arena;
+ void *bulk_param;
+
/*
map for tables that will be updated for a multi-table update query
statement, for other query statements, this will be zero.
@@ -3438,6 +3440,12 @@ public:
To raise this flag, use my_error().
*/
inline bool is_error() const { return m_stmt_da->is_error(); }
+ void set_bulk_execution(void *bulk)
+ {
+ bulk_param= bulk;
+ m_stmt_da->set_bulk_execution(MY_TEST(bulk));
+ }
+ bool is_bulk_op() const { return MY_TEST(bulk_param); }
/// Returns Diagnostics-area for the current statement.
Diagnostics_area *get_stmt_da()
@@ -5510,6 +5518,15 @@ public:
*/
#define CF_UPDATES_DATA (1U << 18)
+/**
+ SP Bulk execution safe
+*/
+#define CF_SP_BULK_SAFE (1U << 19)
+/**
+ SP Bulk execution optimized
+*/
+#define CF_SP_BULK_OPTIMIZED (1U << 20)
+
/* Bits in server_command_flags */
/**
diff --git a/sql/sql_error.cc b/sql/sql_error.cc
index 1d234c578e3..d14c7b83b77 100644
--- a/sql/sql_error.cc
+++ b/sql/sql_error.cc
@@ -320,7 +320,7 @@ Sql_condition::set_sqlstate(const char* sqlstate)
}
Diagnostics_area::Diagnostics_area(bool initialize)
- : m_main_wi(0, false, initialize)
+ : is_bulk_execution(0), m_main_wi(0, false, initialize)
{
push_warning_info(&m_main_wi);
@@ -330,7 +330,8 @@ Diagnostics_area::Diagnostics_area(bool initialize)
Diagnostics_area::Diagnostics_area(ulonglong warning_info_id,
bool allow_unlimited_warnings,
bool initialize)
- : m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
+ : is_bulk_execution(0),
+ m_main_wi(warning_info_id, allow_unlimited_warnings, initialize)
{
push_warning_info(&m_main_wi);
@@ -376,22 +377,33 @@ Diagnostics_area::set_ok_status(ulonglong affected_rows,
const char *message)
{
DBUG_ENTER("set_ok_status");
- DBUG_ASSERT(! is_set());
+ DBUG_ASSERT(!is_set() || (m_status == DA_OK_BULK && is_bulk_op()));
/*
In production, refuse to overwrite an error or a custom response
with an OK packet.
*/
if (is_error() || is_disabled())
return;
-
- m_statement_warn_count= current_statement_warn_count();
- m_affected_rows= affected_rows;
+ /*
+ When running a bulk operation, m_status will be DA_OK for the first
+ operation and set to DA_OK_BULK for all following operations.
+ */
+ if (m_status == DA_OK_BULK)
+ {
+ m_statement_warn_count+= current_statement_warn_count();
+ m_affected_rows+= affected_rows;
+ }
+ else
+ {
+ m_statement_warn_count= current_statement_warn_count();
+ m_affected_rows= affected_rows;
+ m_status= (is_bulk_op() ? DA_OK_BULK : DA_OK);
+ }
m_last_insert_id= last_insert_id;
if (message)
strmake_buf(m_message, message);
else
m_message[0]= '\0';
- m_status= DA_OK;
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_error.h b/sql/sql_error.h
index 8fb1abacf2b..aa8e6c6b0f3 100644
--- a/sql/sql_error.h
+++ b/sql/sql_error.h
@@ -658,6 +658,8 @@ public:
DA_OK,
/** Set whenever one calls my_eof(). */
DA_EOF,
+ /** Set whenever one calls my_ok() in PS bulk mode. */
+ DA_OK_BULK,
/** Set whenever one calls my_error() or my_message(). */
DA_ERROR,
/** Set in case of a custom response, such as one from COM_STMT_PREPARE. */
@@ -699,13 +701,21 @@ public:
bool is_disabled() const { return m_status == DA_DISABLED; }
+ void set_bulk_execution(bool bulk) { is_bulk_execution= bulk; }
+
+ bool is_bulk_op() const { return is_bulk_execution; }
+
enum_diagnostics_status status() const { return m_status; }
const char *message() const
- { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK); return m_message; }
+ { DBUG_ASSERT(m_status == DA_ERROR || m_status == DA_OK ||
+ m_status == DA_OK_BULK); return m_message; }
bool skip_flush() const
- { DBUG_ASSERT(m_status == DA_OK); return m_skip_flush; }
+ {
+ DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+ return m_skip_flush;
+ }
void set_skip_flush()
{ m_skip_flush= TRUE; }
@@ -717,14 +727,21 @@ public:
{ DBUG_ASSERT(m_status == DA_ERROR); return m_sqlstate; }
ulonglong affected_rows() const
- { DBUG_ASSERT(m_status == DA_OK); return m_affected_rows; }
+ {
+ DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+ return m_affected_rows;
+ }
ulonglong last_insert_id() const
- { DBUG_ASSERT(m_status == DA_OK); return m_last_insert_id; }
+ {
+ DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK);
+ return m_last_insert_id;
+ }
uint statement_warn_count() const
{
- DBUG_ASSERT(m_status == DA_OK || m_status == DA_EOF);
+ DBUG_ASSERT(m_status == DA_OK || m_status == DA_OK_BULK ||
+ m_status == DA_EOF);
return m_statement_warn_count;
}
@@ -907,6 +924,8 @@ private:
enum_diagnostics_status m_status;
+ my_bool is_bulk_execution;
+
Warning_info m_main_wi;
Warning_info_list m_wi_stack;
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 1abfd5925b8..432f8b67378 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -77,6 +77,7 @@
#include "transaction.h"
#include "sql_audit.h"
#include "sql_derived.h" // mysql_handle_derived
+#include "sql_prepare.h"
#include "debug_sync.h"
@@ -661,7 +662,9 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
bool using_bulk_insert= 0;
uint value_count;
ulong counter = 1;
+ ulong iteration= 0;
ulonglong id;
+ ulong bulk_iterations= bulk_parameters_iterations(thd);
COPY_INFO info;
TABLE *table= 0;
List_iterator_fast<List_item> its(values_list);
@@ -725,8 +728,11 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
THD_STAGE_INFO(thd, stage_init);
thd->lex->used_tables=0;
values= its++;
+ if (bulk_parameters_set(thd))
+ DBUG_RETURN(TRUE);
value_count= values->elements;
+ DBUG_ASSERT(bulk_iterations > 0);
if (mysql_prepare_insert(thd, table_list, table, fields, values,
update_fields, update_values, duplic, &unused_conds,
FALSE,
@@ -885,106 +891,114 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
goto values_loop_end;
}
}
-
- while ((values= its++))
+ do
{
- if (fields.elements || !value_count)
+ if (iteration && bulk_parameters_set(thd))
+ goto abort;
+
+ while ((values= its++))
{
- /*
- There are possibly some default values:
- INSERT INTO t1 (fields) VALUES ...
- INSERT INTO t1 VALUES ()
- */
- restore_record(table,s->default_values); // Get empty record
- table->reset_default_fields();
- if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
- TRG_EVENT_INSERT))
+ if (fields.elements || !value_count)
{
- if (values_list.elements != 1 && ! thd->is_error())
- {
- info.records++;
- continue;
- }
- /*
- TODO: set thd->abort_on_warning if values_list.elements == 1
- and check that all items return warning in case of problem with
- storing field.
+ /*
+ There are possibly some default values:
+ INSERT INTO t1 (fields) VALUES ...
+ INSERT INTO t1 VALUES ()
*/
- error=1;
- break;
+ restore_record(table,s->default_values); // Get empty record
+ table->reset_default_fields();
+ if (fill_record_n_invoke_before_triggers(thd, table, fields, *values, 0,
+ TRG_EVENT_INSERT))
+ {
+ if (values_list.elements != 1 && ! thd->is_error())
+ {
+ info.records++;
+ continue;
+ }
+ /*
+ TODO: set thd->abort_on_warning if values_list.elements == 1
+ and check that all items return warning in case of problem with
+ storing field.
+ */
+ error=1;
+ break;
+ }
}
- }
- else
- {
- /*
- No field list, all fields are set explicitly:
- INSERT INTO t1 VALUES (values)
- */
- if (thd->lex->used_tables) // Column used in values()
- restore_record(table,s->default_values); // Get empty record
else
{
- TABLE_SHARE *share= table->s;
-
/*
- Fix delete marker. No need to restore rest of record since it will
- be overwritten by fill_record() anyway (and fill_record() does not
- use default values in this case).
+ No field list, all fields are set explicitly:
+ INSERT INTO t1 VALUES (values)
*/
-#ifdef HAVE_valgrind
- if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
- restore_record(table,s->default_values); // Get empty record
+ if (thd->lex->used_tables) // Column used in values()
+ restore_record(table,s->default_values); // Get empty record
else
+ {
+ TABLE_SHARE *share= table->s;
+
+ /*
+ Fix delete marker. No need to restore rest of record since it will
+ be overwritten by fill_record() anyway (and fill_record() does not
+ use default values in this case).
+ */
+#ifdef HAVE_valgrind
+ if (table->file->ha_table_flags() && HA_RECORD_MUST_BE_CLEAN_ON_WRITE)
+ restore_record(table,s->default_values); // Get empty record
+ else
#endif
- table->record[0][0]= share->default_values[0];
+ table->record[0][0]= share->default_values[0];
- /* Fix undefined null_bits. */
- if (share->null_bytes > 1 && share->last_null_bit_pos)
+ /* Fix undefined null_bits. */
+ if (share->null_bytes > 1 && share->last_null_bit_pos)
+ {
+ table->record[0][share->null_bytes - 1]=
+ share->default_values[share->null_bytes - 1];
+ }
+ }
+ if (fill_record_n_invoke_before_triggers(thd, table,
+ table->field_to_fill(),
+ *values, 0, TRG_EVENT_INSERT))
{
- table->record[0][share->null_bytes - 1]=
- share->default_values[share->null_bytes - 1];
+ if (values_list.elements != 1 && ! thd->is_error())
+ {
+ info.records++;
+ continue;
+ }
+ error=1;
+ break;
}
}
- if (fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
- *values, 0, TRG_EVENT_INSERT))
+
+ if ((res= table_list->view_check_option(thd,
+ (values_list.elements == 1 ?
+ 0 :
+ ignore))) ==
+ VIEW_CHECK_SKIP)
+ continue;
+ else if (res == VIEW_CHECK_ERROR)
{
- if (values_list.elements != 1 && ! thd->is_error())
- {
- info.records++;
- continue;
- }
- error=1;
- break;
+ error= 1;
+ break;
}
- }
-
- if ((res= table_list->view_check_option(thd,
- (values_list.elements == 1 ?
- 0 :
- ignore))) ==
- VIEW_CHECK_SKIP)
- continue;
- else if (res == VIEW_CHECK_ERROR)
- {
- error= 1;
- break;
- }
#ifndef EMBEDDED_LIBRARY
- if (lock_type == TL_WRITE_DELAYED)
- {
- LEX_STRING const st_query = { query, thd->query_length() };
- DEBUG_SYNC(thd, "before_write_delayed");
- error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
- DEBUG_SYNC(thd, "after_write_delayed");
- query=0;
- }
- else
+ if (lock_type == TL_WRITE_DELAYED)
+ {
+ LEX_STRING const st_query = { query, thd->query_length() };
+ DEBUG_SYNC(thd, "before_write_delayed");
+ error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
+ DEBUG_SYNC(thd, "after_write_delayed");
+ query=0;
+ }
+ else
#endif
- error=write_record(thd, table ,&info);
- if (error)
- break;
- thd->get_stmt_da()->inc_current_row_for_warning();
- }
+ error=write_record(thd, table ,&info);
+ if (error)
+ break;
+ thd->get_stmt_da()->inc_current_row_for_warning();
+ }
+ its.rewind();
+ iteration++;
+ } while (iteration < bulk_iterations);
values_loop_end:
free_underlaid_joins(thd, &thd->lex->select_lex);
@@ -1131,7 +1145,7 @@ values_loop_end:
retval= thd->lex->explain->send_explain(thd);
goto abort;
}
- if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
+ if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->cuted_fields))
{
my_ok(thd, info.copied + info.deleted +
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index ddb5e2744cb..74b5ac79ad1 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -570,17 +570,19 @@ void init_update_queries(void)
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
- CF_UPDATES_DATA;
+ CF_UPDATES_DATA | CF_SP_BULK_SAFE;
sql_command_flags[SQLCOM_UPDATE_MULTI]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
- CF_UPDATES_DATA;
+ CF_UPDATES_DATA | CF_SP_BULK_SAFE;
sql_command_flags[SQLCOM_INSERT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
- CF_INSERTS_DATA;
+ CF_INSERTS_DATA |
+ CF_SP_BULK_SAFE |
+ CF_SP_BULK_OPTIMIZED;
sql_command_flags[SQLCOM_INSERT_SELECT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
@@ -598,7 +600,7 @@ void init_update_queries(void)
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
CF_CAN_BE_EXPLAINED |
- CF_INSERTS_DATA;;
+ CF_INSERTS_DATA | CF_SP_BULK_SAFE;
sql_command_flags[SQLCOM_REPLACE_SELECT]= CF_CHANGES_DATA | CF_REEXECUTION_FRAGILE |
CF_CAN_GENERATE_ROW_EVENTS |
CF_OPTIMIZER_TRACE |
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index ac398338679..e191dae6f98 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -162,6 +162,9 @@ public:
Select_fetch_protocol_binary result;
Item_param **param_array;
Server_side_cursor *cursor;
+ uchar *packet;
+ uchar *packet_end;
+ ulong iterations;
uint param_count;
uint last_errno;
uint flags;
@@ -180,11 +183,15 @@ public:
*/
uint select_number_after_prepare;
char last_error[MYSQL_ERRMSG_SIZE];
+ my_bool start_param;
#ifndef EMBEDDED_LIBRARY
bool (*set_params)(Prepared_statement *st, uchar *data, uchar *data_end,
uchar *read_pos, String *expanded_query);
+ bool (*set_bulk_params)(Prepared_statement *st,
+ uchar **read_pos, uchar *data_end, bool reset);
#else
bool (*set_params_data)(Prepared_statement *st, String *expanded_query);
+ /*TODO: add bulk support for builtin server */
#endif
bool (*set_params_from_actual_params)(Prepared_statement *stmt,
List<Item> &list,
@@ -204,7 +211,13 @@ public:
bool execute_loop(String *expanded_query,
bool open_cursor,
uchar *packet_arg, uchar *packet_end_arg);
+ bool execute_bulk_loop(String *expanded_query,
+ bool open_cursor,
+ uchar *packet_arg, uchar *packet_end_arg,
+ ulong iterations);
bool execute_server_runnable(Server_runnable *server_runnable);
+ my_bool set_bulk_parameters(bool reset);
+ ulong bulk_iterations();
/* Destroy this statement */
void deallocate();
bool execute_immediate(const char *query, uint query_length);
@@ -962,11 +975,59 @@ static bool insert_params(Prepared_statement *stmt, uchar *null_array,
}
+static bool insert_bulk_params(Prepared_statement *stmt,
+ uchar **read_pos, uchar *data_end,
+ bool reset)
+{
+ Item_param **begin= stmt->param_array;
+ Item_param **end= begin + stmt->param_count;
+
+ DBUG_ENTER("insert_params");
+
+ for (Item_param **it= begin; it < end; ++it)
+ {
+ Item_param *param= *it;
+ if (reset)
+ param->reset();
+ if (param->state != Item_param::LONG_DATA_VALUE)
+ {
+ if (param->indicators)
+ param->indicator= (enum_indicator_type) *((*read_pos)++);
+ else
+ param->indicator= STMT_INDICATOR_NONE;
+ if ((*read_pos) > data_end)
+ DBUG_RETURN(1);
+ switch (param->indicator)
+ {
+ case STMT_INDICATOR_NONE:
+ if ((*read_pos) >= data_end)
+ DBUG_RETURN(1);
+ param->set_param_func(param, read_pos, (uint) (data_end - (*read_pos)));
+ if (param->state == Item_param::NO_VALUE)
+ DBUG_RETURN(1);
+ break;
+ case STMT_INDICATOR_NULL:
+ param->set_null();
+ break;
+ case STMT_INDICATOR_DEFAULT:
+ param->set_default();
+ break;
+ }
+ }
+ else
+ DBUG_RETURN(1); // long is not supported here
+ }
+ DBUG_RETURN(0);
+}
+
static bool setup_conversion_functions(Prepared_statement *stmt,
- uchar **data, uchar *data_end)
+ uchar **data, uchar *data_end,
+ bool bulk_protocol= 0)
{
/* skip null bits */
- uchar *read_pos= *data + (stmt->param_count+7) / 8;
+ uchar *read_pos= *data;
+ if (!bulk_protocol)
+ read_pos+= (stmt->param_count+7) / 8;
DBUG_ENTER("setup_conversion_functions");
@@ -983,6 +1044,7 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
{
ushort typecode;
const uint signed_bit= 1 << 15;
+ const uint indicators_bit= 1 << 14;
if (read_pos >= data_end)
DBUG_RETURN(1);
@@ -990,7 +1052,10 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
typecode= sint2korr(read_pos);
read_pos+= 2;
(**it).unsigned_flag= MY_TEST(typecode & signed_bit);
- setup_one_conversion_function(thd, *it, (uchar) (typecode & ~signed_bit));
+ if (bulk_protocol)
+ (**it).indicators= MY_TEST(typecode & indicators_bit);
+ setup_one_conversion_function(thd, *it,
+ (uchar) (typecode & 0xff));
}
}
*data= read_pos;
@@ -999,6 +1064,8 @@ static bool setup_conversion_functions(Prepared_statement *stmt,
#else
+//TODO: support bulk parameters
+
/**
Embedded counterparts of parameter assignment routines.
@@ -2996,6 +3063,7 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
uchar *packet= (uchar*)packet_arg; // GCC 4.0.1 workaround
ulong stmt_id= uint4korr(packet);
ulong flags= (ulong) packet[4];
+ ulong iterations= uint4korr(packet + 5);
/* Query text for binary, general or slow log, if any of them is open */
String expanded_query;
uchar *packet_end= packet + packet_length;
@@ -3021,12 +3089,16 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length)
thd->profiling.set_query_source(stmt->query(), stmt->query_length());
#endif
DBUG_PRINT("exec_query", ("%s", stmt->query()));
- DBUG_PRINT("info",("stmt: 0x%lx", (long) stmt));
+ DBUG_PRINT("info",("stmt: 0x%p iterations: %lu", stmt, iterations));
open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY);
thd->protocol= &thd->protocol_binary;
- stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
+ if (iterations <= 1)
+ stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end);
+ else
+ stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end,
+ iterations);
thd->protocol= save_protocol;
sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
@@ -3531,9 +3603,13 @@ Prepared_statement::Prepared_statement(THD *thd_arg)
result(thd_arg),
param_array(0),
cursor(0),
+ packet(0),
+ packet_end(0),
+ iterations(0),
param_count(0),
last_errno(0),
- flags((uint) IS_IN_USE)
+ flags((uint) IS_IN_USE),
+ start_param(0)
{
init_sql_alloc(&main_mem_root, thd_arg->variables.query_alloc_block_size,
thd_arg->variables.query_prealloc_size, MYF(MY_THREAD_SPECIFIC));
@@ -3569,7 +3645,9 @@ void Prepared_statement::setup_set_params()
set_params_from_actual_params= insert_params_from_actual_params_with_log;
#ifndef EMBEDDED_LIBRARY
set_params= insert_params_with_log;
+ set_bulk_params= insert_bulk_params; // TODO: add binlog support
#else
+ //TODO: add bulk support for bulk parameters
set_params_data= emb_insert_params_with_log;
#endif
}
@@ -3578,7 +3656,9 @@ void Prepared_statement::setup_set_params()
set_params_from_actual_params= insert_params_from_actual_params;
#ifndef EMBEDDED_LIBRARY
set_params= insert_params;
+ set_bulk_params= insert_bulk_params;
#else
+ //TODO: add bulk support for bulk parameters
set_params_data= emb_insert_params;
#endif
}
@@ -3935,6 +4015,7 @@ Prepared_statement::set_parameters(String *expanded_query,
@retval FALSE successfully executed the statement, perhaps
after having reprepared it a few times.
*/
+const static int MAX_REPREPARE_ATTEMPTS= 3;
bool
Prepared_statement::execute_loop(String *expanded_query,
@@ -3942,10 +4023,10 @@ Prepared_statement::execute_loop(String *expanded_query,
uchar *packet,
uchar *packet_end)
{
- const int MAX_REPREPARE_ATTEMPTS= 3;
Reprepare_observer reprepare_observer;
bool error;
int reprepare_attempt= 0;
+ iterations= 0;
#ifndef DBUG_OFF
Item *free_list_state= thd->free_list;
#endif
@@ -4037,6 +4118,199 @@ reexecute:
return error;
}
+my_bool bulk_parameters_set(THD *thd)
+{
+ DBUG_ENTER("bulk_parameters_set");
+ Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
+
+ if (stmt && stmt->set_bulk_parameters(FALSE))
+ DBUG_RETURN(TRUE);
+ DBUG_RETURN(FALSE);
+}
+
+ulong bulk_parameters_iterations(THD *thd)
+{
+ Prepared_statement *stmt= (Prepared_statement *) thd->bulk_param;
+ if (!stmt)
+ return 1;
+ return stmt->bulk_iterations();
+}
+
+
+my_bool Prepared_statement::set_bulk_parameters(bool reset)
+{
+ DBUG_ENTER("Prepared_statement::set_bulk_parameters");
+ DBUG_PRINT("info", ("iteration: %lu", iterations));
+ if (iterations)
+ {
+#ifndef EMBEDDED_LIBRARY
+ if ((*set_bulk_params)(this, &packet, packet_end, reset))
+#else
+ // bulk parameters are not supported for embedded, so it will an error
+#endif
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0),
+ "mysqld_stmt_bulk_execute");
+ reset_stmt_params(this);
+ DBUG_RETURN(true);
+ }
+ iterations--;
+ }
+ start_param= 0;
+ DBUG_RETURN(false);
+}
+
+ulong Prepared_statement::bulk_iterations()
+{
+ if (iterations)
+ return iterations;
+ return start_param ? 1 : 0;
+}
+
+bool
+Prepared_statement::execute_bulk_loop(String *expanded_query,
+ bool open_cursor,
+ uchar *packet_arg,
+ uchar *packet_end_arg,
+ ulong iterations_arg)
+{
+ Reprepare_observer reprepare_observer;
+ bool error= 0;
+ packet= packet_arg;
+ packet_end= packet_end_arg;
+ iterations= iterations_arg;
+ start_param= true;
+#ifndef DBUG_OFF
+ Item *free_list_state= thd->free_list;
+#endif
+ thd->select_number= select_number_after_prepare;
+ thd->set_bulk_execution((void *)this);
+ /* Check if we got an error when sending long data */
+ if (state == Query_arena::STMT_ERROR)
+ {
+ my_message(last_errno, last_error, MYF(0));
+ thd->set_bulk_execution(0);
+ return TRUE;
+ }
+
+ if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_SAFE))
+ {
+ my_error(ER_UNSUPPORTED_PS, MYF(0));
+ thd->set_bulk_execution(0);
+ return TRUE;
+ }
+
+#ifndef EMBEDDED_LIBRARY
+ if (setup_conversion_functions(this, &packet, packet_end, TRUE))
+#else
+ // bulk parameters are not supported for embedded, so it will an error
+#endif
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0),
+ "mysqld_stmt_bulk_execute");
+ reset_stmt_params(this);
+ thd->set_bulk_execution(0);
+ return true;
+ }
+
+#ifdef NOT_YET_FROM_MYSQL_5_6
+ if (unlikely(thd->security_ctx->password_expired &&
+ !lex->is_change_password))
+ {
+ my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
+ thd->set_bulk_execution(0);
+ return true;
+ }
+#endif
+
+ // iterations changed by set_bulk_parameters
+ while ((iterations || start_param) && !error && !thd->is_error())
+ {
+ int reprepare_attempt= 0;
+
+ /*
+ Here we set parameters for not optimized commands,
+ optimized commands do it inside thier internal loop.
+ */
+ if (!(sql_command_flags[lex->sql_command] & CF_SP_BULK_OPTIMIZED))
+ {
+ if (set_bulk_parameters(TRUE))
+ {
+ thd->set_bulk_execution(0);
+ return true;
+ }
+ }
+
+reexecute:
+ /*
+ If the free_list is not empty, we'll wrongly free some externally
+ allocated items when cleaning up after validation of the prepared
+ statement.
+ */
+ DBUG_ASSERT(thd->free_list == free_list_state);
+
+ /*
+ Install the metadata observer. If some metadata version is
+ different from prepare time and an observer is installed,
+ the observer method will be invoked to push an error into
+ the error stack.
+ */
+
+ if (sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE)
+ {
+ reprepare_observer.reset_reprepare_observer();
+ DBUG_ASSERT(thd->m_reprepare_observer == NULL);
+ thd->m_reprepare_observer= &reprepare_observer;
+ }
+
+ error= execute(expanded_query, open_cursor) || thd->is_error();
+
+ thd->m_reprepare_observer= NULL;
+#ifdef WITH_WSREP
+
+ if (WSREP_ON)
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ switch (thd->wsrep_conflict_state)
+ {
+ case CERT_FAILURE:
+ WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %lld err: %d",
+ (longlong) thd->thread_id,
+ thd->get_stmt_da()->sql_errno() );
+ thd->wsrep_conflict_state = NO_CONFLICT;
+ break;
+
+ case MUST_REPLAY:
+ (void) wsrep_replay_transaction(thd);
+ break;
+
+ default:
+ break;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif /* WITH_WSREP */
+
+ if ((sql_command_flags[lex->sql_command] & CF_REEXECUTION_FRAGILE) &&
+ error && !thd->is_fatal_error && !thd->killed &&
+ reprepare_observer.is_invalidated() &&
+ reprepare_attempt++ < MAX_REPREPARE_ATTEMPTS)
+ {
+ DBUG_ASSERT(thd->get_stmt_da()->sql_errno() == ER_NEED_REPREPARE);
+ thd->clear_error();
+
+ error= reprepare();
+
+ if (! error) /* Success */
+ goto reexecute;
+ }
+ }
+ reset_stmt_params(this);
+ thd->set_bulk_execution(0);
+
+ return error;
+}
+
bool
Prepared_statement::execute_server_runnable(Server_runnable *server_runnable)
diff --git a/sql/sql_prepare.h b/sql/sql_prepare.h
index 4a8780c4a02..820cb43e6d5 100644
--- a/sql/sql_prepare.h
+++ b/sql/sql_prepare.h
@@ -72,6 +72,7 @@ private:
void mysqld_stmt_prepare(THD *thd, const char *packet, uint packet_length);
void mysqld_stmt_execute(THD *thd, char *packet, uint packet_length);
+void mysqld_stmt_bulk_execute(THD *thd, char *packet, uint packet_length);
void mysqld_stmt_close(THD *thd, char *packet);
void mysql_sql_stmt_prepare(THD *thd);
void mysql_sql_stmt_execute(THD *thd);
@@ -82,6 +83,8 @@ void mysqld_stmt_reset(THD *thd, char *packet);
void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length);
void reinit_stmt_before_use(THD *thd, LEX *lex);
+ulong bulk_parameters_iterations(THD *thd);
+my_bool bulk_parameters_set(THD *thd);
/**
Execute a fragment of server code in an isolated context, so that
it doesn't leave any effect on THD. THD must have no open tables.
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 06846e7dd6a..03e53dd3e97 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -265,7 +265,8 @@ void wsrep_replay_transaction(THD *thd)
}
else if (thd->get_stmt_da()->is_set())
{
- if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK)
+ if (thd->get_stmt_da()->status() != Diagnostics_area::DA_OK &&
+ thd->get_stmt_da()->status() != Diagnostics_area::DA_OK_BULK)
{
WSREP_WARN("replay ok, thd has error status %d",
thd->get_stmt_da()->status());
diff --git a/storage/perfschema/pfs.cc b/storage/perfschema/pfs.cc
index dbe3241cfe8..4cd1a35fd77 100644
--- a/storage/perfschema/pfs.cc
+++ b/storage/perfschema/pfs.cc
@@ -4827,6 +4827,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch(da->status())
{
+ case Diagnostics_area::DA_OK_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK:
@@ -4960,6 +4961,7 @@ static void end_statement_v1(PSI_statement_locker *locker, void *stmt_da)
switch (da->status())
{
+ case Diagnostics_area::DA_OK_BULK:
case Diagnostics_area::DA_EMPTY:
break;
case Diagnostics_area::DA_OK: