diff options
Diffstat (limited to 'sql/sql_parse.cc')
-rw-r--r-- | sql/sql_parse.cc | 1152 |
1 files changed, 1145 insertions, 7 deletions
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index ea0a19fc5f3..888aebd52b2 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -104,6 +104,16 @@ #include "../storage/maria/ha_maria.h" #endif +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#include "rpl_rli.h" +static void wsrep_client_rollback(THD *thd); + +extern Format_description_log_event *wsrep_format_desc; + +static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, + Parser_state *parser_state); +#endif /* WITH_WSREP */ /** @defgroup Runtime_Environment Runtime Environment @{ @@ -596,6 +606,13 @@ bool is_log_table_write_query(enum enum_sql_command command) return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0; } +#ifdef WITH_WSREP +bool is_show_query(enum enum_sql_command command) +{ + DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); + return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; +} +#endif void execute_init_command(THD *thd, LEX_STRING *init_command, mysql_rwlock_t *var_lock) { @@ -792,8 +809,12 @@ void do_handle_bootstrap(THD *thd) if (my_thread_init() || thd->store_globals()) { #ifndef EMBEDDED_LIBRARY +#ifdef WITH_WSREP + close_connection(thd, ER_OUT_OF_RESOURCES, 1); +#else close_connection(thd, ER_OUT_OF_RESOURCES); #endif +#endif thd->fatal_error(); goto end; } @@ -866,7 +887,18 @@ bool do_command(THD *thd) NET *net= &thd->net; enum enum_server_command command; DBUG_ENTER("do_command"); - +#ifdef WITH_WSREP + if (WSREP(thd)) + { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_IDLE; + if (thd->wsrep_conflict_state==MUST_ABORT) + { + wsrep_client_rollback(thd); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* indicator of uninitialized lex => normal flow of errors handling (see my_message_sql) @@ -915,6 +947,30 @@ bool do_command(THD *thd) thd->m_server_idle= TRUE; packet_length= my_net_read(net); thd->m_server_idle= FALSE; +#ifdef WITH_WSREP + if (WSREP(thd)) { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + /* these THD's are aborted or are aborting during being idle */ + if (thd->wsrep_conflict_state == ABORTING) + { + while (thd->wsrep_conflict_state == ABORTING) { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + my_sleep(1000); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + } + thd->store_globals(); + } + else if (thd->wsrep_conflict_state == ABORTED) + { + thd->store_globals(); + thd->wsrep_bf_thd = NULL; + } + + thd->wsrep_query_state= QUERY_EXEC; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* WITH_WSREP */ if (packet_length == packet_error) { @@ -922,6 +978,17 @@ bool do_command(THD *thd) net->error, vio_description(net->vio))); +#ifdef WITH_WSREP + if (WSREP(thd)) { + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) + { + DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id)); + wsrep_client_rollback(thd); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* Instrument this broken statement as "statement/com/error" */ thd->m_statement_psi= MYSQL_REFINE_STATEMENT(thd->m_statement_psi, com_statement_info[COM_END]. @@ -976,12 +1043,54 @@ bool do_command(THD *thd) vio_description(net->vio), command, command_name[command].str)); +#ifdef WITH_WSREP + if (WSREP(thd)) { + /* + * bail out if DB snapshot has not been installed. We however, + * allow queries "SET" and "SHOW", they are trapped later in execute_command + */ + if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && + command != COM_QUERY && + command != COM_PING && + command != COM_QUIT && + command != COM_PROCESS_INFO && + command != COM_PROCESS_KILL && + command != COM_SET_OPTION && + command != COM_SHUTDOWN && + command != COM_SLEEP && + command != COM_STATISTICS && + command != COM_TIME && + command != COM_END + ) { + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + "WSREP has not yet prepared node for application use"); + thd->protocol->end_statement(); + return_value= FALSE; + goto out; + } + } +#endif /* Restore read timeout value */ my_net_set_read_timeout(net, thd->variables.net_read_timeout); DBUG_ASSERT(packet_length); return_value= dispatch_command(command, thd, packet+1, (uint) (packet_length-1)); - +#ifdef WITH_WSREP + if (WSREP(thd)) { + while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) + { + return_value= dispatch_command(command, thd, thd->wsrep_retry_query, + thd->wsrep_retry_query_len); + } + } + if (thd->wsrep_retry_query && thd->wsrep_conflict_state != REPLAYING) + { + my_free(thd->wsrep_retry_query); + thd->wsrep_retry_query = NULL; + thd->wsrep_retry_query_len = 0; + thd->wsrep_retry_command = COM_CONNECT; + } +#endif out: /* The statement instrumentation must be closed in all cases. */ DBUG_ASSERT(thd->m_statement_psi == NULL); @@ -1057,6 +1166,33 @@ static my_bool deny_updates_if_read_only_option(THD *thd, DBUG_RETURN(FALSE); } +#ifdef WITH_WSREP +static my_bool wsrep_read_only_option(THD *thd, TABLE_LIST *all_tables) +{ + int opt_readonly_saved = opt_readonly; + ulong flag_saved = (ulong)(thd->security_ctx->master_access & SUPER_ACL); + + opt_readonly = 0; + thd->security_ctx->master_access &= ~SUPER_ACL; + + my_bool ret = !deny_updates_if_read_only_option(thd, all_tables); + + opt_readonly = opt_readonly_saved; + thd->security_ctx->master_access |= flag_saved; + + return ret; +} + +static void wsrep_copy_query(THD *thd) +{ + thd->wsrep_retry_command = thd->get_command(); + thd->wsrep_retry_query_len = thd->query_length(); + thd->wsrep_retry_query = (char *)my_malloc( + thd->wsrep_retry_query_len + 1, MYF(0)); + strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len); + thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0'; +} +#endif /* WITH_WSREP */ /** Perform one connection-level (COM_XXXX) command. @@ -1086,6 +1222,43 @@ bool dispatch_command(enum enum_server_command command, THD *thd, DBUG_ENTER("dispatch_command"); DBUG_PRINT("info", ("command: %d", command)); +#ifdef WITH_WSREP + if (WSREP(thd)) { + if (!thd->in_multi_stmt_transaction_mode()) + { + thd->wsrep_PA_safe= true; + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + if (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) + { + thd->wsrep_conflict_state= NO_CONFLICT; + } + if (thd->wsrep_conflict_state== MUST_ABORT) + { + wsrep_client_rollback(thd); + } + if (thd->wsrep_conflict_state== ABORTED) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + WSREP_DEBUG("Deadlock error for: %s", thd->query()); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + thd->killed = NOT_KILLED; + thd->mysys_var->abort = 0; + thd->wsrep_conflict_state = NO_CONFLICT; + thd->wsrep_retry_counter = 0; + thd->wsrep_bf_thd = NULL; + /* + Increment threads running to compensate dec_thread_running() called + after dispatch_end label. + */ + inc_thread_running(); + goto dispatch_end; + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } +#endif /* WITH_WSREP */ #if defined(ENABLED_PROFILING) thd->profiling.start_new_query(); #endif @@ -1271,7 +1444,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, if (parser_state.init(thd, thd->query(), thd->query_length())) break; +#ifdef WITH_WSREP + wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); +#else mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); +#endif while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) && ! thd->is_error()) @@ -1337,10 +1514,19 @@ bool dispatch_command(enum enum_server_command command, THD *thd, Count each statement from the client. */ statistic_increment(thd->status_var.questions, &LOCK_status); +#ifdef WITH_WSREP + if (!WSREP(thd)) + thd->set_time(); /* Reset the query start time. */ +#else thd->set_time(); /* Reset the query start time. */ +#endif parser_state.reset(beginning_of_next_stmt, length); /* TODO: set thd->lex->sql_command to SQLCOM_END here */ +#ifdef WITH_WSREP + wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); +#else mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); +#endif } DBUG_PRINT("info",("query ready")); @@ -1659,6 +1845,23 @@ bool dispatch_command(enum enum_server_command command, THD *thd, my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0)); break; } +#ifdef WITH_WSREP + dispatch_end: + + if (WSREP(thd)) { + /* wsrep BF abort in query exec phase */ + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if ((thd->wsrep_conflict_state != REPLAYING) && + (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT)) + { + thd->update_server_status(); + thd->protocol->end_statement(); + query_cache_end_of_result(thd); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + } else { /* if (WSREP(thd))... */ +#endif /* WITH_WSREP */ DBUG_ASSERT(thd->derived_tables == NULL && (thd->open_tables == NULL || (thd->locked_tables_mode == LTM_LOCK_TABLES))); @@ -1668,6 +1871,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->update_server_status(); thd->protocol->end_statement(); query_cache_end_of_result(thd); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ if (!thd->is_error() && !thd->killed_errno()) mysql_audit_general(thd, MYSQL_AUDIT_GENERAL_RESULT, 0, 0); @@ -2332,7 +2538,66 @@ mysql_execute_command(THD *thd) #ifdef HAVE_REPLICATION } /* endif unlikely slave */ #endif +#ifdef WITH_WSREP + if (WSREP(thd)) { + /* + change LOCK TABLE WRITE to transaction + */ + if (lex->sql_command== SQLCOM_LOCK_TABLES && wsrep_convert_LOCK_to_trx) + { + for (TABLE_LIST *table= all_tables; table; table= table->next_global) + { + if (table->lock_type >= TL_WRITE_ALLOW_WRITE) + { + lex->sql_command= SQLCOM_BEGIN; + thd->wsrep_converted_lock_session= true; + break; + } + } + } + if (lex->sql_command== SQLCOM_UNLOCK_TABLES && + thd->wsrep_converted_lock_session) + { + thd->wsrep_converted_lock_session= false; + lex->sql_command= SQLCOM_COMMIT; + lex->tx_release= TVL_NO; + } + /* + * bail out if DB snapshot has not been installed. We however, + * allow SET and SHOW queries + */ + if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && + lex->sql_command != SQLCOM_SET_OPTION && + !is_show_query(lex->sql_command)) + { +#if DIRTY_HACK + /* Dirty hack for lp:1002714 - trying to recognize mysqldump connection + * and allow it to continue. Actuall mysqldump_magic_str may be longer + * and is obviously version dependent and may be issued by any client + * connection after which connection becomes non-replicating. */ + static char const mysqldump_magic_str[]= +"SELECT LOGFILE_GROUP_NAME, FILE_NAME, TOTAL_EXTENTS, INITIAL_SIZE, ENGINE, EXTRA FROM INFORMATION_SCHEMA.FILES WHERE FILE_TYPE = 'UNDO LOG' AND FILE_NAME IS NOT NULL"; + static const size_t mysqldump_magic_str_len= sizeof(mysqldump_magic_str) -1; + if (SQLCOM_SELECT != lex->sql_command || + thd->query_length() < mysqldump_magic_str_len || + strncmp(thd->query(), mysqldump_magic_str, mysqldump_magic_str_len)) + { +#endif /* DIRTY_HACK */ + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + "WSREP has not yet prepared node for application use"); + goto error; +#if DIRTY_HACK + } + else + { + /* mysqldump connection, allow all further queries to pass */ + thd->variables.wsrep_on= FALSE; + } +#endif /* DIRTY_HACK */ + } + } +#endif /* WITH_WSREP */ status_var_increment(thd->status_var.com_stat[lex->sql_command]); thd->progress.report_to_client= test(sql_command_flags[lex->sql_command] & CF_REPORT_PROGRESS); @@ -2404,6 +2669,9 @@ mysql_execute_command(THD *thd) #endif case SQLCOM_SHOW_STATUS_PROC: case SQLCOM_SHOW_STATUS_FUNC: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ if ((res= check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE))) goto error; @@ -2449,17 +2717,27 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_PLUGINS: case SQLCOM_SHOW_FIELDS: case SQLCOM_SHOW_KEYS: +#ifndef WITH_WSREP case SQLCOM_SHOW_VARIABLES: case SQLCOM_SHOW_CHARSETS: case SQLCOM_SHOW_COLLATIONS: case SQLCOM_SHOW_STORAGE_ENGINES: case SQLCOM_SHOW_PROFILE: +#endif /* WITH_WSREP */ case SQLCOM_SHOW_CLIENT_STATS: case SQLCOM_SHOW_USER_STATS: case SQLCOM_SHOW_TABLE_STATS: case SQLCOM_SHOW_INDEX_STATS: case SQLCOM_SELECT: - { +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; + case SQLCOM_SHOW_VARIABLES: + case SQLCOM_SHOW_CHARSETS: + case SQLCOM_SHOW_COLLATIONS: + case SQLCOM_SHOW_STORAGE_ENGINES: + case SQLCOM_SHOW_PROFILE: +#endif /* WITH_WSREP */ + { thd->status_var.last_query_cost= 0.0; /* @@ -2815,7 +3093,7 @@ case SQLCOM_PREPARE: */ if (thd->query_name_consts && mysql_bin_log.is_open() && - thd->variables.binlog_format == BINLOG_FORMAT_STMT && + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT && !mysql_bin_log.is_query_in_union(thd, thd->query_id)) { List_iterator_fast<Item> it(select_lex->item_list); @@ -2926,6 +3204,12 @@ case SQLCOM_PREPARE: if (create_info.options & HA_LEX_CREATE_TMP_TABLE) thd->variables.option_bits|= OPTION_KEEP_LOG; /* regular create */ +#ifdef WITH_WSREP + if (!thd->is_current_stmt_binlog_format_row() || + !(create_info.options & HA_LEX_CREATE_TMP_TABLE)) + WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name, + NULL) +#endif /* WITH_WSREP */ if (create_info.options & HA_LEX_CREATE_TABLE_LIKE) { /* CREATE TABLE ... LIKE ... */ @@ -2967,6 +3251,7 @@ end_with_restore_list: DBUG_ASSERT(first_table == all_tables && first_table != 0); if (check_one_table_access(thd, INDEX_ACL, all_tables)) goto error; /* purecov: inspected */ + WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name, NULL) /* Currently CREATE INDEX or DROP INDEX cause a full table rebuild and thus classify as slow administrative statements just like @@ -3063,6 +3348,7 @@ end_with_restore_list: #endif /* HAVE_REPLICATION */ case SQLCOM_RENAME_TABLE: { + WSREP_TO_ISOLATION_BEGIN(0, 0, first_table) if (execute_rename_table(thd, first_table, all_tables)) goto error; break; @@ -3090,6 +3376,10 @@ end_with_restore_list: goto error; #else { +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ + /* Access check: SHOW CREATE TABLE require any privileges on the table level (ie @@ -3145,6 +3435,10 @@ end_with_restore_list: case SQLCOM_CHECKSUM: { DBUG_ASSERT(first_table == all_tables && first_table != 0); +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ + if (check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* purecov: inspected */ @@ -3342,6 +3636,15 @@ end_with_restore_list: DBUG_ASSERT(first_table == all_tables && first_table != 0); if ((res= insert_precheck(thd, all_tables))) break; +#ifdef WITH_WSREP + if (lex->sql_command == SQLCOM_INSERT_SELECT && + thd->wsrep_consistency_check == CONSISTENCY_CHECK_DECLARED) + { + thd->wsrep_consistency_check = CONSISTENCY_CHECK_RUNNING; + WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name, NULL); + } + +#endif /* INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/ INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY @@ -3505,6 +3808,18 @@ end_with_restore_list: /* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */ thd->variables.option_bits|= OPTION_KEEP_LOG; } +#ifdef WITH_WSREP + for (TABLE_LIST *table= all_tables; table; table= table->next_global) + { + if (!lex->drop_temporary && + (!thd->is_current_stmt_binlog_format_row() || + !find_temporary_table(thd, table))) + { + WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables); + break; + } + } +#endif /* WITH_WSREP */ /* DDL and binlog write order are protected by metadata locks. */ res= mysql_rm_table(thd, first_table, lex->check_exists, lex->drop_temporary); @@ -3548,7 +3863,6 @@ end_with_restore_list: if (!mysql_change_db(thd, &db_str, FALSE)) my_ok(thd); - break; } @@ -3691,6 +4005,7 @@ end_with_restore_list: #endif if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0)) break; + WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL, NULL) res= mysql_create_db(thd,(lower_case_table_names == 2 ? alias : lex->name.str), &create_info, 0); break; @@ -3720,6 +4035,7 @@ end_with_restore_list: #endif if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0)) break; + WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL, NULL) res= mysql_rm_db(thd, lex->name.str, lex->check_exists, 0); break; } @@ -3748,6 +4064,7 @@ end_with_restore_list: res= 1; break; } + WSREP_TO_ISOLATION_BEGIN(db->str, NULL, NULL) res= mysql_upgrade_db(thd, db); if (!res) my_ok(thd); @@ -3780,6 +4097,7 @@ end_with_restore_list: #endif if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0)) break; + WSREP_TO_ISOLATION_BEGIN(db->str, NULL, NULL) res= mysql_alter_db(thd, db->str, &create_info); break; } @@ -3812,6 +4130,7 @@ end_with_restore_list: if (res) break; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) switch (lex->sql_command) { case SQLCOM_CREATE_EVENT: { @@ -3846,6 +4165,7 @@ end_with_restore_list: lex->spname->m_name); break; case SQLCOM_DROP_EVENT: + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (!(res= Events::drop_event(thd, lex->spname->m_db, lex->spname->m_name, lex->check_exists))) @@ -3860,6 +4180,7 @@ end_with_restore_list: if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 0)) break; #ifdef HAVE_DLOPEN + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (!(res = mysql_create_function(thd, &lex->udf))) my_ok(thd); #else @@ -3874,6 +4195,7 @@ end_with_restore_list: if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 1) && check_global_access(thd,CREATE_USER_ACL)) break; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) /* Conditionally writes to binlog */ if (!(res= mysql_create_user(thd, lex->users_list))) my_ok(thd); @@ -3885,6 +4207,7 @@ end_with_restore_list: check_global_access(thd,CREATE_USER_ACL)) break; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (!(res= mysql_drop_user(thd, lex->users_list))) my_ok(thd); break; @@ -3895,6 +4218,7 @@ end_with_restore_list: check_global_access(thd,CREATE_USER_ACL)) break; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (!(res= mysql_rename_user(thd, lex->users_list))) my_ok(thd); break; @@ -3909,6 +4233,7 @@ end_with_restore_list: thd->binlog_invoker(); /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (!(res = mysql_revoke_all(thd, lex->users_list))) my_ok(thd); break; @@ -3975,6 +4300,7 @@ end_with_restore_list: lex->type == TYPE_ENUM_PROCEDURE, 0)) goto error; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= mysql_routine_grant(thd, all_tables, lex->type == TYPE_ENUM_PROCEDURE, lex->users_list, grants, @@ -3988,6 +4314,7 @@ end_with_restore_list: all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= mysql_table_grant(thd, all_tables, lex->users_list, lex->columns, lex->grant, lex->sql_command == SQLCOM_REVOKE); @@ -4003,6 +4330,7 @@ end_with_restore_list: } else { + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) /* Conditionally writes to binlog */ res = mysql_grant(thd, select_lex->db, lex->users_list, lex->grant, lex->sql_command == SQLCOM_REVOKE, @@ -4141,9 +4469,17 @@ end_with_restore_list: able to open it (with SQLCOM_HA_OPEN) in the first place. */ unit->set_limit(select_lex); +#ifdef WITH_WSREP + { char* tmp_info= NULL; + if (WSREP(thd)) tmp_info = (char *)thd_proc_info(thd, "mysql_ha_read()"); +#endif /* WITH_WSREP */ res= mysql_ha_read(thd, first_table, lex->ha_read_mode, lex->ident.str, lex->insert_list, lex->ha_rkey_mode, select_lex->where, unit->select_limit_cnt, unit->offset_limit_cnt); +#ifdef WITH_WSREP + if (WSREP(thd)) thd_proc_info(thd, tmp_info); + } +#endif /* WITH_WSREP */ break; case SQLCOM_BEGIN: @@ -4213,8 +4549,20 @@ end_with_restore_list: /* Disconnect the current client connection. */ if (tx_release) thd->killed= KILL_CONNECTION; - my_ok(thd); - break; + #ifdef WITH_WSREP + if (WSREP(thd)) { + if (thd->wsrep_conflict_state == NO_CONFLICT || + thd->wsrep_conflict_state == REPLAYING) + { + my_ok(thd); + } + } else { +#endif /* WITH_WSREP */ + my_ok(thd); + #ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ + break; } case SQLCOM_RELEASE_SAVEPOINT: if (trans_release_savepoint(thd, lex->ident)) @@ -4282,6 +4630,7 @@ end_with_restore_list: if (sp_process_definer(thd)) goto create_sp_error; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= (sp_result= sp_create_routine(thd, lex->sphead->m_type, lex->sphead)); switch (sp_result) { case SP_OK: { @@ -4493,6 +4842,7 @@ create_sp_error: already puts on CREATE FUNCTION. */ /* Conditionally writes to binlog */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) sp_result= sp_update_routine(thd, type, lex->spname, &lex->sp_chistics); switch (sp_result) { @@ -4564,6 +4914,7 @@ create_sp_error: if (check_routine_access(thd, ALTER_PROC_ACL, db, name, lex->sql_command == SQLCOM_DROP_PROCEDURE, 0)) goto error; + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) /* Conditionally writes to binlog */ sp_result= sp_drop_routine(thd, type, lex->spname); @@ -4681,6 +5032,7 @@ create_sp_error: Note: SQLCOM_CREATE_VIEW also handles 'ALTER VIEW' commands as specified through the thd->lex->create_view_mode flag. */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= mysql_create_view(thd, first_table, thd->lex->create_view_mode); break; } @@ -4689,12 +5041,14 @@ create_sp_error: if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE)) goto error; /* Conditionally writes to binlog. */ + WSREP_TO_ISOLATION_BEGIN(NULL, NULL, NULL) res= mysql_drop_view(thd, first_table, thd->lex->drop_mode); break; } case SQLCOM_CREATE_TRIGGER: { /* Conditionally writes to binlog. */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= mysql_create_or_drop_trigger(thd, all_tables, 1); break; @@ -4702,6 +5056,7 @@ create_sp_error: case SQLCOM_DROP_TRIGGER: { /* Conditionally writes to binlog. */ + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) res= mysql_create_or_drop_trigger(thd, all_tables, 0); break; } @@ -4754,11 +5109,13 @@ create_sp_error: my_ok(thd); break; case SQLCOM_INSTALL_PLUGIN: + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (! (res= mysql_install_plugin(thd, &thd->lex->comment, &thd->lex->ident))) my_ok(thd); break; case SQLCOM_UNINSTALL_PLUGIN: + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (! (res= mysql_uninstall_plugin(thd, &thd->lex->comment, &thd->lex->ident))) my_ok(thd); @@ -4919,6 +5276,9 @@ finish: /* Free tables */ THD_STAGE_INFO(thd, stage_closing_tables); close_thread_tables(thd); +#ifdef WITH_WSREP + thd->wsrep_consistency_check= NO_CONSISTENCY_CHECK; +#endif /* WITH_WSREP */ #ifndef DBUG_OFF if (lex->sql_command != SQLCOM_SET_OPTION && ! thd->in_sub_stmt) @@ -4959,6 +5319,7 @@ finish: { thd->mdl_context.release_statement_locks(); } + WSREP_TO_ISOLATION_END DBUG_RETURN(res || thd->is_error()); } @@ -5027,6 +5388,9 @@ static bool execute_sqlcom_select(THD *thd, TABLE_LIST *all_tables) if (!thd->get_sent_row_count()) status_var_increment(thd->status_var.empty_queries); status_var_add(thd->status_var.rows_sent, thd->get_sent_row_count()); +#ifdef WITH_WSREP + if (lex->sql_command == SQLCOM_SHOW_STATUS) wsrep_free_status(thd); +#endif /* WITH_WSREP */ return res; } @@ -5847,6 +6211,21 @@ void THD::reset_for_next_command(bool calculate_userstat) thd->auto_inc_intervals_in_cur_stmt_for_binlog.empty(); thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0; +#ifdef WITH_WSREP + if (WSREP(thd)) { + if (wsrep_auto_increment_control) + { + if (thd->variables.auto_increment_offset != + global_system_variables.auto_increment_offset) + thd->variables.auto_increment_offset= + global_system_variables.auto_increment_offset; + if (thd->variables.auto_increment_increment != + global_system_variables.auto_increment_increment) + thd->variables.auto_increment_increment= + global_system_variables.auto_increment_increment; + } + } +#endif /* WITH_WSREP */ thd->query_start_used= 0; thd->query_start_sec_part_used= 0; thd->is_fatal_error= thd->time_zone_used= 0; @@ -6056,6 +6435,179 @@ void mysql_init_multi_delete(LEX *lex) lex->query_tables_last= &lex->query_tables; } +#ifdef WITH_WSREP +void wsrep_replay_transaction(THD *thd) +{ + /* checking if BF trx must be replayed */ + if (thd->wsrep_conflict_state== MUST_REPLAY) + { + if (thd->wsrep_exec_mode!= REPL_RECV) + { + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay issue, thd has reported status already"); + } + thd->stmt_da->reset_diagnostics_area(); + + thd->wsrep_conflict_state= REPLAYING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; + close_thread_tables(thd); + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("releasing table lock for replaying (%ld)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + thd->mdl_context.release_transactional_locks(); + + thd_proc_info(thd, "wsrep replaying trx"); + WSREP_DEBUG("replay trx: %s %lld", + thd->query() ? thd->query() : "void", + (long long)thd->wsrep_trx_seqno); + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + int rcode = wsrep->replay_trx(wsrep, + &thd->wsrep_trx_handle, + (void *)thd); + + wsrep_return_from_bf_mode(thd, &shadow); + if (thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + switch (rcode) + { + case WSREP_OK: + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + break; + case WSREP_TRX_FAIL: + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + thd->wsrep_bf_thd = NULL; + wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); + break; + default: + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; + } + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } + } +} + +static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, + Parser_state *parser_state) +{ + bool is_autocommit= + !thd->in_multi_stmt_transaction_mode() && + thd->wsrep_conflict_state == NO_CONFLICT && + !thd->wsrep_applier && + wsrep_read_only_option(thd, thd->lex->query_tables); + + do + { + if (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) + { + thd->wsrep_conflict_state= NO_CONFLICT; + } + mysql_parse(thd, rawbuf, length, parser_state); + + if (WSREP(thd)) { + /* wsrep BF abort in query exec phase */ + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == MUST_ABORT) { + wsrep_client_rollback(thd); + + WSREP_DEBUG("abort in exec query state, avoiding autocommit"); + } + + if (thd->wsrep_conflict_state== MUST_REPLAY) + { + wsrep_replay_transaction(thd); + } + + /* setting error code for BF aborted trxs */ + if (thd->wsrep_conflict_state == ABORTED || + thd->wsrep_conflict_state == CERT_FAILURE) + { + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; + if (is_autocommit && + thd->lex->sql_command != SQLCOM_SELECT && + (thd->wsrep_retry_counter < thd->variables.wsrep_retry_autocommit)) + { + WSREP_DEBUG("wsrep retrying AC query: %s", + (thd->query()) ? thd->query() : "void"); + + close_thread_tables(thd); + + thd->wsrep_conflict_state= RETRY_AUTOCOMMIT; + thd->wsrep_retry_counter++; // grow + wsrep_copy_query(thd); + thd->set_time(); + parser_state->reset(rawbuf, length); + } + else + { + WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s", + (thd->wsrep_conflict_state == ABORTED) ? + "BF Aborted" : "cert failure", + thd->thread_id, is_autocommit, thd->wsrep_retry_counter, + thd->variables.wsrep_retry_autocommit, thd->query()); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + thd->killed= NOT_KILLED; + thd->wsrep_conflict_state= NO_CONFLICT; + if (thd->wsrep_conflict_state != REPLAYING) + thd->wsrep_retry_counter= 0; // reset + } + } + else + { + set_if_smaller(thd->wsrep_retry_counter, 0); // reset; eventually ok + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + } + } while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT); + + if (thd->wsrep_retry_query) + { + WSREP_DEBUG("releasing retry_query: conf %d sent %d kill %d errno %d SQL %s", + thd->wsrep_conflict_state, + thd->stmt_da->is_sent, + thd->killed, + thd->stmt_da->is_error() ? thd->stmt_da->sql_errno() : 0, + thd->wsrep_retry_query); + my_free(thd->wsrep_retry_query); + thd->wsrep_retry_query = NULL; + thd->wsrep_retry_query_len = 0; + thd->wsrep_retry_command = COM_CONNECT; + } +} +#endif /* WITH_WSREP */ /* When you modify mysql_parse(), you may need to mofify @@ -7063,8 +7615,14 @@ uint kill_one_thread(THD *thd, ulong id, killed_state kill_signal) faster and do a harder kill than KILL_SYSTEM_THREAD; */ +#ifdef WITH_WSREP + if (((thd->security_ctx->master_access & SUPER_ACL) || + thd->security_ctx->user_matches(tmp->security_ctx)) && + !wsrep_thd_is_brute_force((void *)tmp)) +#else if ((thd->security_ctx->master_access & SUPER_ACL) || thd->security_ctx->user_matches(tmp->security_ctx)) +#endif /* WITH_WSREP */ { tmp->awake(kill_signal); error=0; @@ -7839,6 +8397,586 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name) return definer; } +#ifdef WITH_WSREP +/* must have (&thd->LOCK_wsrep_thd) */ +static void wsrep_client_rollback(THD *thd) +{ + WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s", + thd->thread_id, thd->query()); + + thd->wsrep_conflict_state= ABORTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + trans_rollback(thd); + + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + + if (thd->global_read_lock.is_acquired()) + { + WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id); + thd->global_read_lock.unlock_global_read_lock(thd); + } + + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + + if (thd->get_binlog_table_maps()) + { + WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id); + thd->clear_binlog_table_maps(); + } + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; + thd->wsrep_bf_thd = NULL; +} + +static enum wsrep_status wsrep_apply_sql( + THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed) +{ + int error; + enum wsrep_status ret_code= WSREP_OK; + + DBUG_ENTER("wsrep_bf_execute_cb"); + thd->wsrep_exec_mode= REPL_RECV; + thd->net.vio= 0; + thd->start_time= timeval; + thd->wsrep_rand= randseed; + + thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT; + + DBUG_PRINT("wsrep", ("SQL: %s", sql)); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + /* preserve replaying mode */ + if (thd->wsrep_conflict_state!= REPLAYING) + thd->wsrep_conflict_state= NO_CONFLICT; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) { + WSREP_WARN("BF SQL apply failed: %d, %lld", + thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); + DBUG_RETURN(WSREP_FATAL); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state!= NO_CONFLICT && + thd->wsrep_conflict_state!= REPLAYING) { + ret_code= WSREP_FATAL; + WSREP_DEBUG("BF thd ending, with: %d, %lld", + thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); + } + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + assert(thd->wsrep_exec_mode== REPL_RECV); + DBUG_RETURN(ret_code); +} + +void wsrep_write_rbr_buf( + THD *thd, const void* rbr_buf, size_t buf_len) +{ + char filename[PATH_MAX]= {0}; + int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log", + wsrep_data_home_dir, thd->thread_id, + (long long)thd->wsrep_trx_seqno); + if (len >= PATH_MAX) + { + WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); + return; + } + + FILE *of= fopen(filename, "wb"); + if (of) + { + fwrite (rbr_buf, buf_len, 1, of); + fclose(of); + } + else + { + WSREP_ERROR("Failed to open file '%s': %d (%s)", + filename, errno, strerror(errno)); + } +} + +static inline wsrep_status_t wsrep_apply_rbr( + THD *thd, const uchar *rbr_buf, size_t buf_len) +{ + char *buf= (char *)rbr_buf; + int rcode= 0; + int event= 1; + Format_description_log_event *description_event = wsrep_format_desc; + DBUG_ENTER("wsrep_apply_rbr"); + + if (thd->killed == KILL_CONNECTION) + { + WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld", + (long long) thd->wsrep_trx_seqno); + DBUG_RETURN(WSREP_FATAL); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + if (thd->wsrep_conflict_state!= REPLAYING) + thd->wsrep_conflict_state= NO_CONFLICT; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", + (long long) thd->wsrep_trx_seqno); + + if ((rcode= trans_begin(thd))) + WSREP_WARN("begin for rbr apply failed: %lld, code: %d", + (long long) thd->wsrep_trx_seqno, rcode); + + while(buf_len) + { + int exec_res; + int error = 0; + Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event); + + if (!ev) + { + WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld", + (long long)thd->wsrep_trx_seqno, buf_len); + rcode= 1; + goto error; + } + switch (ev->get_type_code()) { + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + DBUG_ASSERT(buf_len != 0 || + ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)); + break; + case FORMAT_DESCRIPTION_EVENT: + description_event = (Format_description_log_event *)ev; + break; + default: + break; + } + + thd->variables.server_id = ev->server_id; // use the original server id for logging + thd->set_time(); // time the query + wsrep_xid_init(&thd->transaction.xid_state.xid, + wsrep_cluster_uuid(), + thd->wsrep_trx_seqno); + thd->lex->current_select= 0; + if (!ev->when) + ev->when = time(NULL); + ev->thd = thd; + exec_res = ev->apply_event(thd->wsrep_rli); + DBUG_PRINT("info", ("exec_event result: %d", exec_res)); + + if (exec_res) + { + WSREP_WARN("RBR event %d %s apply warning: %d, %lld", + event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno); + rcode= exec_res; + /* stop processing for the first error */ + delete ev; + goto error; + } + event++; + + if (thd->wsrep_conflict_state!= NO_CONFLICT && + thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("conflict state after RBR event applying: %d, %lld", + thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno); + + if (thd->wsrep_conflict_state == MUST_ABORT) { + WSREP_WARN("RBR event apply failed, rolling back: %lld", + (long long) thd->wsrep_trx_seqno); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + thd->wsrep_conflict_state= NO_CONFLICT; + DBUG_RETURN(WSREP_FATAL); + } + + if (ev->get_type_code() != TABLE_MAP_EVENT && + ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)) + { + // TODO: combine with commit on higher level common for the query ws + + thd->wsrep_rli->cleanup_context(thd, 0); + + if (error == 0) + { + thd->clear_error(); + } + else + WSREP_ERROR("Error in %s event: commit of row events failed: %lld", + ev->get_type_str(), (long long)thd->wsrep_trx_seqno); + } + + if (description_event != ev) + delete ev; + } + + error: + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_IDLE; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + assert(thd->wsrep_exec_mode== REPL_RECV); + + if (thd->killed == KILL_CONNECTION) + WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno); + + if (rcode) DBUG_RETURN(WSREP_FATAL); + DBUG_RETURN(WSREP_OK); +} + +wsrep_status_t wsrep_apply_cb(void* const ctx, + const void* const buf, size_t const buf_len, + wsrep_seqno_t const global_seqno) +{ + THD* const thd((THD*)ctx); + + thd->wsrep_trx_seqno= global_seqno; + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "applying write set %lld: %p, %zu", + (long long)thd->wsrep_trx_seqno, buf, buf_len); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "applying write set"); +#endif /* WSREP_PROC_INFO */ + + wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "applied write set %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "applied write set"); +#endif /* WSREP_PROC_INFO */ + + if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len); + + return rcode; +} + +#if DELETE // this does not work in 5.5 +/* a common wrapper for end_trans() function - to put all necessary stuff */ +static inline wsrep_status_t +wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end) +{ + if (0 == end_trans(thd, end)) + { + return WSREP_OK; + } + else + { + return WSREP_FATAL; + } +} +#endif + +wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno) +{ +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committing %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committing"); +#endif /* WSREP_PROC_INFO */ + + wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0)); +// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committed %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committed"); +#endif /* WSREP_PROC_INFO */ + + if (WSREP_OK == rcode) + { + // TODO: mark snapshot with global_seqno. + } + + return rcode; +} + +wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno) +{ +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "rolling back %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "rolling back"); +#endif /* WSREP_PROC_INFO */ + + wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0)); +// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "rolled back %lld", (long long)thd->wsrep_trx_seqno); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "rolled back"); +#endif /* WSREP_PROC_INFO */ + + return rcode; +} + +wsrep_status_t wsrep_commit_cb(void* const ctx, + wsrep_seqno_t const global_seqno, + bool const commit) +{ + THD* const thd((THD*)ctx); + + assert(global_seqno == thd->wsrep_trx_seqno); + + if (commit) + return wsrep_commit(thd, global_seqno); + else + return wsrep_rollback(thd, global_seqno); +} + +Relay_log_info* wsrep_relay_log_init(const char* log_fname) +{ + Relay_log_info* rli= new Relay_log_info(false); + + rli->no_storage= true; + if (!rli->relay_log.description_event_for_exec) + { + rli->relay_log.description_event_for_exec= + new Format_description_log_event(4); + } + + rli->sql_thd= current_thd; + return rli; +} + +void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) +{ + shadow->options = thd->variables.option_bits; + shadow->wsrep_exec_mode = thd->wsrep_exec_mode; + shadow->vio = thd->net.vio; + + if (opt_log_slave_updates) + thd->variables.option_bits|= OPTION_BIN_LOG; + else + thd->variables.option_bits&= ~(OPTION_BIN_LOG); + + if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay"); + + thd->wsrep_exec_mode= REPL_RECV; + thd->net.vio= 0; + thd->clear_error(); + + thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT; + + shadow->tx_isolation = thd->variables.tx_isolation; + thd->variables.tx_isolation = ISO_READ_COMMITTED; + thd->tx_isolation = ISO_READ_COMMITTED; +} + +void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) +{ + thd->variables.option_bits = shadow->options; + thd->wsrep_exec_mode = shadow->wsrep_exec_mode; + thd->net.vio = shadow->vio; + thd->variables.tx_isolation = shadow->tx_isolation; +} + +void wsrep_replication_process(THD *thd) +{ + int rcode; + DBUG_ENTER("wsrep_replication_process"); + + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + + rcode = wsrep->recv(wsrep, (void *)thd); + DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); + + WSREP_INFO("applier thread exiting (code:%d)", rcode); + + switch (rcode) { + case WSREP_OK: + case WSREP_NOT_IMPLEMENTED: + case WSREP_CONN_FAIL: + /* provider does not support slave operations / disconnected from group, + * just close applier thread */ + break; + case WSREP_NODE_FAIL: + /* data inconsistency => SST is needed */ + /* Note: we cannot just blindly restart replication here, + * SST might require server restart if storage engines must be + * initialized after SST */ + WSREP_ERROR("node consistency compromised, aborting"); + wsrep_kill_mysql(thd); + break; + case WSREP_WARNING: + case WSREP_TRX_FAIL: + case WSREP_TRX_MISSING: + /* these suggests a bug in provider code */ + WSREP_WARN("bad return from recv() call: %d", rcode); + /* fall through to node shutdown */ + case WSREP_FATAL: + /* Cluster connectivity is lost. + * + * If applier was killed on purpose (KILL_CONNECTION), we + * avoid mysql shutdown. This is because the killer will then handle + * shutdown processing (or replication restarting) + */ + if (thd->killed != KILL_CONNECTION) + { + wsrep_kill_mysql(thd); + } + break; + } + + if (thd->killed != KILL_CONNECTION) + { + mysql_mutex_lock(&LOCK_thread_count); + wsrep_close_applier(thd); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + } + wsrep_return_from_bf_mode(thd, &shadow); + DBUG_VOID_RETURN; +} + +void wsrep_rollback_process(THD *thd) +{ + DBUG_ENTER("wsrep_rollback_process"); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + wsrep_aborting_thd= NULL; + + while (thd->killed == NOT_KILLED) { + thd_proc_info(thd, "wsrep aborter idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; + thd->mysys_var->current_cond= &COND_wsrep_rollback; + + mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); + + WSREP_DEBUG("WSREP rollback thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep aborter active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* check for false alarms */ + if (!wsrep_aborting_thd) + { + WSREP_DEBUG("WSREP rollback thread has empty abort queue"); + } + /* process all entries in the queue */ + while (wsrep_aborting_thd) { + THD *aborting; + wsrep_aborting_thd_t next = wsrep_aborting_thd->next; + aborting = wsrep_aborting_thd->aborting_thd; + my_free(wsrep_aborting_thd); + wsrep_aborting_thd= next; + /* + * must release mutex, appliers my want to add more + * aborting thds in our work queue, while we rollback + */ + mysql_mutex_unlock(&LOCK_wsrep_rollback); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + if (aborting->wsrep_conflict_state== ABORTED) + { + WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", + (long long)aborting->real_id, + aborting->wsrep_conflict_state); + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_rollback); + continue; + } + aborting->wsrep_conflict_state= ABORTING; + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + aborting->store_globals(); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + wsrep_client_rollback(aborting); + WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)", + aborting->thread_id, (long long)aborting->real_id); + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + } + } + + mysql_mutex_unlock(&LOCK_wsrep_rollback); + sql_print_information("WSREP: rollbacker thread exiting"); + + DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); + DBUG_VOID_RETURN; +} +extern +int wsrep_thd_is_brute_force(void *thd_ptr) +{ + if (thd_ptr) { + switch (((THD *)thd_ptr)->wsrep_exec_mode) { + case LOCAL_STATE: + { + if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING) + { + return 1; + } + return 0; + } + case REPL_RECV: return 1; + case TOTAL_ORDER: return 2; + case LOCAL_COMMIT: return 3; + } + } + return 0; +} +extern "C" +int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) +{ + THD *victim_thd = (THD *) victim_thd_ptr; + THD *bf_thd = (THD *) bf_thd_ptr; + DBUG_ENTER("wsrep_abort_thd"); + + if ( (WSREP(bf_thd) || + ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && + bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && + victim_thd) + { + WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? + (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); + ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); + } + else + { + WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); + } + + DBUG_RETURN(1); +} +extern "C" +int wsrep_thd_in_locking_session(void *thd_ptr) +{ + if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { + return 1; + } + return 0; +} +#endif /** Retuns information about user or current user. |