diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 247 |
1 files changed, 144 insertions, 103 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 4792a2c9f0e..b06336669f3 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -54,7 +54,6 @@ #include "rpl_constants.h" #include "sql_digest.h" #include "zlib.h" -#include "my_atomic.h" #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1)) @@ -102,16 +101,11 @@ TYPELIB binlog_checksum_typelib= TODO: correct the constant when it has been determined (which main tree to push and when) */ -const uchar checksum_version_split_mysql[3]= {5, 6, 1}; -const ulong checksum_version_product_mysql= - (checksum_version_split_mysql[0] * 256 + - checksum_version_split_mysql[1]) * 256 + - checksum_version_split_mysql[2]; -const uchar checksum_version_split_mariadb[3]= {5, 3, 0}; -const ulong checksum_version_product_mariadb= - (checksum_version_split_mariadb[0] * 256 + - checksum_version_split_mariadb[1]) * 256 + - checksum_version_split_mariadb[2]; +const Version checksum_version_split_mysql(5, 6, 1); +const Version checksum_version_split_mariadb(5, 3, 0); + +// First MySQL version with fraction seconds +const Version fsp_version_split_mysql(5, 6, 0); #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD* thd); @@ -1639,8 +1633,11 @@ int Log_event_writer::encrypt_and_write(const uchar *pos, size_t len) return 1; uint dstlen; - if (encryption_ctx_update(ctx, pos, (uint)len, dst, &dstlen)) + if (len == 0) + dstlen= 0; + else if (encryption_ctx_update(ctx, pos, (uint)len, dst, &dstlen)) goto err; + if (maybe_write_event_len(dst, dstlen)) return 1; pos= dst; @@ -1858,8 +1855,16 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, { uchar iv[BINLOG_IV_LENGTH]; fdle->crypto_data.set_iv(iv, (uint32) (my_b_tell(file) - data_len)); - - char *newpkt= (char*)my_malloc(data_len + ev_offset + 1, MYF(MY_WME)); + size_t sz= data_len + ev_offset + 1; +#ifdef HAVE_WOLFSSL + /* + Workaround for MDEV-19582. + WolfSSL reads memory out of bounds with decryption/NOPAD) + We allocate a little more memory therefore. + */ + sz += MY_AES_BLOCK_SIZE; +#endif + char *newpkt= (char*)my_malloc(sz, MYF(MY_WME)); if (!newpkt) DBUG_RETURN(LOG_READ_MEM); memcpy(newpkt, packet->ptr(), ev_offset); @@ -2761,9 +2766,7 @@ log_event_print_value(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info, goto return_null; uint bin_size= my_decimal_get_binary_size(precision, decimals); - my_decimal dec; - binary2my_decimal(E_DEC_FATAL_ERROR, (uchar*) ptr, &dec, - precision, decimals); + my_decimal dec((const uchar *) ptr, precision, decimals); int length= DECIMAL_MAX_STR_LENGTH; char buff[DECIMAL_MAX_STR_LENGTH + 1]; decimal2string(&dec, buff, &length, 0, 0, 0); @@ -4336,9 +4339,14 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, size_t que /* If Query_log_event will contain non trans keyword (not BEGIN, COMMIT, SAVEPOINT or ROLLBACK) we disable PA for this transaction. + Note that here WSREP(thd) might not be true e.g. when wsrep_shcema + is created we create tables with thd->variables.wsrep_on=false + to avoid replicating wsrep_schema tables to other nodes. */ if (WSREP_ON && !is_trans_keyword()) + { thd->wsrep_PA_safe= false; + } #endif /* WITH_WSREP */ memset(&user, 0, sizeof(user)); @@ -4420,41 +4428,44 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, size_t que bool trx_cache= FALSE; cache_type= Log_event::EVENT_INVALID_CACHE; - switch (lex->sql_command) + if (!direct) { - case SQLCOM_DROP_TABLE: - case SQLCOM_DROP_SEQUENCE: - use_cache= (lex->tmp_table() && thd->in_multi_stmt_transaction_mode()); - break; + switch (lex->sql_command) + { + case SQLCOM_DROP_TABLE: + case SQLCOM_DROP_SEQUENCE: + use_cache= (lex->tmp_table() && thd->in_multi_stmt_transaction_mode()); + break; - case SQLCOM_CREATE_TABLE: - case SQLCOM_CREATE_SEQUENCE: - /* - If we are using CREATE ... SELECT or if we are a slave - executing BEGIN...COMMIT (generated by CREATE...SELECT) we - have to use the transactional cache to ensure we don't - calculate any checksum for the CREATE part. - */ - trx_cache= (lex->select_lex.item_list.elements && - thd->is_current_stmt_binlog_format_row()) || - (thd->variables.option_bits & OPTION_GTID_BEGIN); - use_cache= (lex->tmp_table() && - thd->in_multi_stmt_transaction_mode()) || trx_cache; - break; - case SQLCOM_SET_OPTION: - if (lex->autocommit) - use_cache= trx_cache= FALSE; - else - use_cache= TRUE; - break; - case SQLCOM_RELEASE_SAVEPOINT: - case SQLCOM_ROLLBACK_TO_SAVEPOINT: - case SQLCOM_SAVEPOINT: - use_cache= trx_cache= TRUE; - break; - default: - use_cache= sqlcom_can_generate_row_events(thd); - break; + case SQLCOM_CREATE_TABLE: + case SQLCOM_CREATE_SEQUENCE: + /* + If we are using CREATE ... SELECT or if we are a slave + executing BEGIN...COMMIT (generated by CREATE...SELECT) we + have to use the transactional cache to ensure we don't + calculate any checksum for the CREATE part. + */ + trx_cache= (lex->first_select_lex()->item_list.elements && + thd->is_current_stmt_binlog_format_row()) || + (thd->variables.option_bits & OPTION_GTID_BEGIN); + use_cache= (lex->tmp_table() && + thd->in_multi_stmt_transaction_mode()) || trx_cache; + break; + case SQLCOM_SET_OPTION: + if (lex->autocommit) + use_cache= trx_cache= FALSE; + else + use_cache= TRUE; + break; + case SQLCOM_RELEASE_SAVEPOINT: + case SQLCOM_ROLLBACK_TO_SAVEPOINT: + case SQLCOM_SAVEPOINT: + use_cache= trx_cache= TRUE; + break; + default: + use_cache= sqlcom_can_generate_row_events(thd); + break; + } } if (!use_cache || direct) @@ -4801,6 +4812,30 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, } } +#if !defined(MYSQL_CLIENT) + if (description_event->server_version_split.kind == + Format_description_log_event::master_version_split::KIND_MYSQL) + { + // Handle MariaDB/MySQL incompatible sql_mode bits + sql_mode_t mysql_sql_mode= sql_mode; + sql_mode&= MODE_MASK_MYSQL_COMPATIBLE; // Unset MySQL specific bits + + /* + sql_mode flags related to fraction second rounding/truncation + have opposite meaning in MySQL vs MariaDB. + MySQL: + - rounds fractional seconds by default + - truncates if TIME_TRUNCATE_FRACTIONAL is set + MariaDB: + - truncates fractional seconds by default + - rounds if TIME_ROUND_FRACTIONAL is set + */ + if (description_event->server_version_split >= fsp_version_split_mysql && + !(mysql_sql_mode & MODE_MYSQL80_TIME_TRUNCATE_FRACTIONAL)) + sql_mode|= MODE_TIME_ROUND_FRACTIONAL; + } +#endif + /** Layout for the data buffer is as follows +--------+-----------+------+------+---------+----+-------+ @@ -5635,7 +5670,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, gtid= rgi->current_gtid; if (unlikely(rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, - rgi, false, + true, false, &hton))) { int errcode= thd->get_stmt_da()->sql_errno(); @@ -5815,6 +5850,14 @@ compare_errors: "unexpected success or fatal error"), thd->get_db(), query_arg); thd->is_slave_error= 1; +#ifdef WITH_WSREP + if (wsrep_thd_is_toi(thd) && wsrep_must_ignore_error(thd)) + { + thd->clear_error(1); + thd->killed= NOT_KILLED; + thd->wsrep_has_ignored_error= true; + } +#endif /* WITH_WSREP */ } /* @@ -6610,26 +6653,24 @@ bool Format_description_log_event::start_decryption(Start_encryption_log_event* return crypto_data.init(sele->crypto_scheme, sele->key_version); } -static inline void -do_server_version_split(char* version, - Format_description_log_event::master_version_split *split_versions) + +Version::Version(const char *version, const char **endptr) { - char *p= version, *r; + const char *p= version; ulong number; for (uint i= 0; i<=2; i++) { + char *r; number= strtoul(p, &r, 10); /* It is an invalid version if any version number greater than 255 or first number is not followed by '.'. */ if (number < 256 && (*r == '.' || i != 0)) - split_versions->ver[i]= (uchar) number; + m_ver[i]= (uchar) number; else { - split_versions->ver[0]= 0; - split_versions->ver[1]= 0; - split_versions->ver[2]= 0; + *this= Version(); break; } @@ -6637,12 +6678,19 @@ do_server_version_split(char* version, if (*r == '.') p++; // skip the dot } + endptr[0]= p; +} + + +Format_description_log_event:: + master_version_split::master_version_split(const char *version) +{ + const char *p; + static_cast<Version*>(this)[0]= Version(version, &p); if (strstr(p, "MariaDB") != 0 || strstr(p, "-maria-") != 0) - split_versions->kind= - Format_description_log_event::master_version_split::KIND_MARIADB; + kind= KIND_MARIADB; else - split_versions->kind= - Format_description_log_event::master_version_split::KIND_MYSQL; + kind= KIND_MYSQL; } @@ -6656,20 +6704,14 @@ do_server_version_split(char* version, */ void Format_description_log_event::calc_server_version_split() { - do_server_version_split(server_version, &server_version_split); + server_version_split= master_version_split(server_version); DBUG_PRINT("info",("Format_description_log_event::server_version_split:" " '%s' %d %d %d", server_version, - server_version_split.ver[0], - server_version_split.ver[1], server_version_split.ver[2])); + server_version_split[0], + server_version_split[1], server_version_split[2])); } -static inline ulong -version_product(const Format_description_log_event::master_version_split* version_split) -{ - return ((version_split->ver[0] * 256 + version_split->ver[1]) * 256 - + version_split->ver[2]); -} /** @return TRUE is the event's version is earlier than one that introduced @@ -6679,9 +6721,9 @@ bool Format_description_log_event::is_version_before_checksum(const master_version_split *version_split) { - return version_product(version_split) < + return *version_split < (version_split->kind == master_version_split::KIND_MARIADB ? - checksum_version_product_mariadb : checksum_version_product_mysql); + checksum_version_split_mariadb : checksum_version_split_mysql); } /** @@ -6697,7 +6739,6 @@ enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len) { enum enum_binlog_checksum_alg ret; char version[ST_SERVER_VER_LEN]; - Format_description_log_event::master_version_split version_split; DBUG_ENTER("get_checksum_alg"); DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT); @@ -6707,7 +6748,7 @@ enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len) ST_SERVER_VER_LEN); version[ST_SERVER_VER_LEN - 1]= 0; - do_server_version_split(version, &version_split); + Format_description_log_event::master_version_split version_split(version); ret= Format_description_log_event::is_version_before_checksum(&version_split) ? BINLOG_CHECKSUM_ALG_UNDEF : (enum_binlog_checksum_alg)buf[len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN]; @@ -7458,8 +7499,9 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi, ex.skip_lines = skip_lines; List<Item> field_list; - thd->lex->select_lex.context.resolve_in_table_list_only(&tables); - set_fields(tables.db.str, field_list, &thd->lex->select_lex.context); + thd->lex->first_select_lex()->context.resolve_in_table_list_only(&tables); + set_fields(tables.db.str, + field_list, &thd->lex->first_select_lex()->context); thd->variables.pseudo_thread_id= thread_id; if (net) { @@ -8098,16 +8140,13 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) switch (flags2 & (FL_DDL | FL_TRANSACTIONAL)) { case FL_TRANSACTIONAL: - my_atomic_add64_explicit((volatile int64 *)&mi->total_trans_groups, 1, - MY_MEMORY_ORDER_RELAXED); + mi->total_trans_groups++; break; case FL_DDL: - my_atomic_add64_explicit((volatile int64 *)&mi->total_ddl_groups, 1, - MY_MEMORY_ORDER_RELAXED); + mi->total_ddl_groups++; break; default: - my_atomic_add64_explicit((volatile int64 *)&mi->total_non_trans_groups, 1, - MY_MEMORY_ORDER_RELAXED); + mi->total_non_trans_groups++; } if (flags2 & FL_STANDALONE) @@ -8440,7 +8479,7 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) { if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i], sub_id_list[i], - NULL, false, &hton))) + false, false, &hton))) return ret; rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i], hton, NULL); @@ -8977,7 +9016,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) rgi->gtid_pending= false; gtid= rgi->current_gtid; - err= rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, rgi, + err= rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, true, false, &hton); if (unlikely(err)) { @@ -9009,10 +9048,9 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) thd->variables.option_bits&= ~OPTION_GTID_BEGIN; res= trans_commit(thd); /* Automatically rolls back on error. */ thd->mdl_context.release_transactional_locks(); - #ifdef WITH_WSREP if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); - if ((!res || (WSREP(thd) && thd->wsrep_conflict_state == MUST_REPLAY)) && sub_id) + if ((!res || (WSREP(thd) && thd->wsrep_trx().state() == wsrep::transaction::s_must_replay )) && sub_id) #else if (likely(!res) && sub_id) #endif /* WITH_WSREP */ @@ -9020,7 +9058,6 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) #ifdef WITH_WSREP if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data); #endif /* WITH_WSREP */ - /* Increment the global status commit count variable */ @@ -9123,11 +9160,8 @@ void User_var_log_event::pack_info(Protocol* protocol) String buf(buf_mem, sizeof(buf_mem), system_charset_info); char buf2[DECIMAL_MAX_STR_LENGTH+1]; String str(buf2, sizeof(buf2), &my_charset_bin); - my_decimal dec; buf.length(0); - binary2my_decimal(E_DEC_FATAL_ERROR, (uchar*) (val+2), &dec, val[0], - val[1]); - my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str); + my_decimal((const uchar *) (val + 2), val[0], val[1]).to_string(&str); if (user_var_append_name_part(protocol->thd, &buf, name, name_len) || buf.append(buf2)) return; @@ -11366,13 +11400,13 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) { WSREP_WARN("BF applier failed to open_and_lock_tables: %u, fatal: %d " "wsrep = (exec_mode: %d conflict_state: %d seqno: %lld)", - thd->get_stmt_da()->sql_errno(), - thd->is_fatal_error, - thd->wsrep_exec_mode, - thd->wsrep_conflict_state, - (long long)wsrep_thd_trx_seqno(thd)); + thd->get_stmt_da()->sql_errno(), + thd->is_fatal_error, + thd->wsrep_cs().mode(), + thd->wsrep_trx().state(), + (long long) wsrep_thd_trx_seqno(thd)); } -#endif +#endif /* WITH_WSREP */ if (thd->is_error() && !is_parallel_retry_error(rgi, error= thd->get_stmt_da()->sql_errno())) { @@ -11508,10 +11542,10 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) #ifdef HAVE_QUERY_CACHE #ifdef WITH_WSREP /* - Moved invalidation right before the call to rows_event_stmt_cleanup(), - to avoid query cache being polluted with stale entries. + Moved invalidation right before the call to rows_event_stmt_cleanup(), + to avoid query cache being polluted with stale entries, */ - if (! (WSREP(thd) && (thd->wsrep_exec_mode == REPL_RECV))) + if (! (WSREP(thd) && wsrep_thd_is_applying(thd))) { #endif /* WITH_WSREP */ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); @@ -11624,6 +11658,13 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) bool ignored_error= (idempotent_error == 0 ? ignored_error_code(actual_error) : 0); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_ignored_error_code(this, actual_error)) + { + idempotent_error= true; + thd->wsrep_has_ignored_error= true; + } +#endif /* WITH_WSREP */ if (idempotent_error || ignored_error) { if (global_system_variables.log_warnings) @@ -11711,7 +11752,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) restore_empty_query_table_list(thd->lex); #if defined(WITH_WSREP) && defined(HAVE_QUERY_CACHE) - if (WSREP(thd) && thd->wsrep_exec_mode == REPL_RECV) + if (WSREP(thd) && wsrep_thd_is_applying(thd)) { query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); } |