diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 2800 |
1 files changed, 1687 insertions, 1113 deletions
diff --git a/sql/log.cc b/sql/log.cc index 53d81f22b66..9e9716dde67 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2000, 2011, Oracle and/or its affiliates. + Copyright (c) 2010-2011 Monty Program Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,25 +25,36 @@ Abort logging when we get an error in reading or writing log files */ -#include "mysql_priv.h" +#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */ +#include "sql_priv.h" +#include "log.h" +#include "sql_base.h" // open_log_table #include "sql_repl.h" +#include "sql_delete.h" // mysql_truncate +#include "sql_parse.h" // command_name +#include "sql_time.h" // calc_time_from_sec, my_time_compare +#include "tztime.h" // my_tz_OFFSET0, struct Time_zone +#include "sql_acl.h" // SUPER_ACL +#include "log_event.h" // Query_log_event #include "rpl_filter.h" #include "rpl_rli.h" +#include "sql_audit.h" +#include "log_slow.h" #include <my_dir.h> #include <stdarg.h> #include <m_ctype.h> // For test_if_number -#ifdef __NT__ +#ifdef _WIN32 #include "message.h" #endif -#include <mysql/plugin.h> +#include "sql_plugin.h" +#include "rpl_handler.h" #include "debug_sync.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 -#define MAX_USER_HOST_SIZE 512 #define MAX_TIME_SIZE 32 #define MY_OFF_T_UNDEF (~(my_off_t)0UL) @@ -50,11 +62,10 @@ LOGGER logger; -MYSQL_BIN_LOG mysql_bin_log; -ulong sync_binlog_counter= 0; +MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); static bool test_if_number(const char *str, - long *res, bool allow_wildcards); + ulong *res, bool allow_wildcards); static int binlog_init(void *p); static int binlog_close_connection(handlerton *hton, THD *thd); static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv); @@ -73,9 +84,8 @@ ulong binlog_checksum_options; ulong opt_binlog_dbug_fsync_sleep= 0; #endif -static my_bool mutexes_inited; -pthread_mutex_t LOCK_prepare_ordered; -pthread_mutex_t LOCK_commit_ordered; +mysql_mutex_t LOCK_prepare_ordered; +mysql_mutex_t LOCK_commit_ordered; static ulonglong binlog_status_var_num_commits; static ulonglong binlog_status_var_num_group_commits; @@ -97,6 +107,35 @@ static SHOW_VAR binlog_status_vars_detail[]= /** + purge logs, master and slave sides both, related error code + convertor. + Called from @c purge_error_message(), @c MYSQL_BIN_LOG::reset_logs() + + @param res an internal to purging routines error code + + @return the user level error code ER_* +*/ +uint purge_log_get_error_code(int res) +{ + uint errcode= 0; + + switch (res) { + case 0: break; + case LOG_INFO_EOF: errcode= ER_UNKNOWN_TARGET_BINLOG; break; + case LOG_INFO_IO: errcode= ER_IO_ERR_LOG_INDEX_READ; break; + case LOG_INFO_INVALID:errcode= ER_BINLOG_PURGE_PROHIBITED; break; + case LOG_INFO_SEEK: errcode= ER_FSEEK_FAIL; break; + case LOG_INFO_MEM: errcode= ER_OUT_OF_RESOURCES; break; + case LOG_INFO_FATAL: errcode= ER_BINLOG_PURGE_FATAL_ERR; break; + case LOG_INFO_IN_USE: errcode= ER_LOG_IN_USE; break; + case LOG_INFO_EMFILE: errcode= ER_BINLOG_PURGE_EMFILE; break; + default: errcode= ER_LOG_PURGE_UNKNOWN_ERR; break; + } + + return errcode; +} + +/** Silence all errors and warnings reported when performing a write to a log table. Errors and warnings are not reported to the client or SQL exception @@ -114,23 +153,28 @@ public: virtual ~Silence_log_table_errors() {} - virtual bool handle_error(uint sql_errno, const char *message, - MYSQL_ERROR::enum_warning_level level, - THD *thd); + virtual bool handle_condition(THD *thd, + uint sql_errno, + const char* sql_state, + MYSQL_ERROR::enum_warning_level level, + const char* msg, + MYSQL_ERROR ** cond_hdl); const char *message() const { return m_message; } }; bool -Silence_log_table_errors::handle_error(uint /* sql_errno */, - const char *message_arg, - MYSQL_ERROR::enum_warning_level /* level */, - THD * /* thd */) -{ - strmake(m_message, message_arg, sizeof(m_message)-1); +Silence_log_table_errors::handle_condition(THD *, + uint, + const char*, + MYSQL_ERROR::enum_warning_level, + const char* msg, + MYSQL_ERROR ** cond_hdl) +{ + *cond_hdl= NULL; + strmake(m_message, msg, sizeof(m_message)-1); return TRUE; } - sql_print_message_func sql_print_message_handlers[3] = { sql_print_information, @@ -139,149 +183,285 @@ sql_print_message_func sql_print_message_handlers[3] = }; -char *make_default_log_name(char *buff,const char* log_ext) -{ - strmake(buff, pidfile_name, FN_REFLEN-5); - return fn_format(buff, buff, mysql_data_home, log_ext, - MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT)); -} - - -/* - Create a filename from a base with a given suffix. - The name is allocated trough my_once_alloc(), so one should only - use this for startup options that can all be freed at once. +/** + Create the name of the log file + + @param[OUT] out a pointer to a new allocated name will go there + @param[IN] log_ext The extension for the file (e.g .log) + @param[IN] once whether to use malloc_once or a normal malloc. */ - -char *make_once_alloced_filename(const char *basename, const char *ext) +void make_default_log_name(char **out, const char* log_ext, bool once) { - char buff[FN_REFLEN+10], *end, *res; - size_t length; - strmake(buff, basename, sizeof(buff)-10); - end= strmov(fn_ext(buff), ext); - length= (size_t) (end - buff) + 1; - - if ((res= (char*) my_once_alloc(length, MYF(MY_WME)))) - memcpy(res, buff, length); - return res; + char buff[FN_REFLEN+10]; + fn_format(buff, opt_log_basename, "", log_ext, MYF(MY_REPLACE_EXT)); + if (once) + *out= my_once_strdup(buff, MYF(MY_WME)); + else + { + my_free(*out); + *out= my_strdup(buff, MYF(MY_WME)); + } } /* - Helper class to store binary log transaction data. + Helper classes to store non-transactional and transactional data + before copying it to the binary log. */ -class binlog_trx_data { +class binlog_cache_data +{ public: - binlog_trx_data() - : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF), last_commit_pos_offset(0), - using_xa(FALSE), xa_xid(0) + binlog_cache_data(): m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF), + incident(FALSE), changes_to_non_trans_temp_table_flag(FALSE), + saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), + ptr_binlog_cache_disk_use(0) + { } + + ~binlog_cache_data() { - trans_log.end_of_file= max_binlog_cache_size; - last_commit_pos_file[0]= 0; + DBUG_ASSERT(empty()); + close_cached_file(&cache_log); } - ~binlog_trx_data() + bool empty() const { - DBUG_ASSERT(pending() == NULL); - close_cached_file(&trans_log); + return pending() == NULL && my_b_tell(&cache_log) == 0; } - my_off_t position() const { - return my_b_tell(&trans_log); + Rows_log_event *pending() const + { + return m_pending; } - bool empty() const + void set_pending(Rows_log_event *const pending) { - return pending() == NULL && my_b_tell(&trans_log) == 0; + m_pending= pending; } - /* - Truncate the transaction cache to a certain position. This - includes deleting the pending event. - */ - void truncate(my_off_t pos) + void set_incident(void) { - DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); - DBUG_PRINT("info", ("before_stmt_pos=%lu", (ulong) pos)); - if (pending()) - { - delete pending(); - } - set_pending(0); - reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); - trans_log.end_of_file= max_binlog_cache_size; - if (pos < before_stmt_pos) - before_stmt_pos= MY_OFF_T_UNDEF; + incident= TRUE; + } + + bool has_incident(void) + { + return(incident); + } - /* - The only valid positions that can be truncated to are at the - beginning of a statement. We are relying on this fact to be able - to set the at_least_one_stmt_committed flag correctly. In other word, if - we are truncating to the beginning of the transaction cache, - there will be no statements in the cache, otherwhise, we will - have at least one statement in the transaction cache. - */ - at_least_one_stmt_committed= (pos > 0); + void set_changes_to_non_trans_temp_table() + { + changes_to_non_trans_temp_table_flag= TRUE; } - /* - Reset the entire contents of the transaction cache, emptying it - completely. - */ - void reset() { - if (trans_log.type != WRITE_CACHE || !empty()) - truncate(0); - before_stmt_pos= MY_OFF_T_UNDEF; + bool changes_to_non_trans_temp_table() + { + return (changes_to_non_trans_temp_table_flag); + } + + void reset() + { + compute_statistics(); + truncate(0); + changes_to_non_trans_temp_table_flag= FALSE; incident= FALSE; - trans_log.end_of_file= max_binlog_cache_size; - using_xa= FALSE; - last_commit_pos_file[0]= 0; - last_commit_pos_offset= 0; + before_stmt_pos= MY_OFF_T_UNDEF; + /* + The truncate function calls reinit_io_cache that calls my_b_flush_io_cache + which may increase disk_writes. This breaks the disk_writes use by the + binary log which aims to compute the ratio between in-memory cache usage + and disk cache usage. To avoid this undesirable behavior, we reset the + variable after truncating the cache. + */ + cache_log.disk_writes= 0; DBUG_ASSERT(empty()); } - Rows_log_event *pending() const + my_off_t get_byte_position() const { - return m_pending; + return my_b_tell(&cache_log); } - void set_pending(Rows_log_event *const pending) + my_off_t get_prev_position() { - m_pending= pending; + return(before_stmt_pos); } - IO_CACHE trans_log; // The transaction cache - - void set_incident(void) + void set_prev_position(my_off_t pos) { - incident= TRUE; + before_stmt_pos= pos; } - bool has_incident(void) + void restore_prev_position() { - return(incident); + truncate(before_stmt_pos); } - /** - Boolean that is true if there is at least one statement in the - transaction cache. + void restore_savepoint(my_off_t pos) + { + truncate(pos); + if (pos < before_stmt_pos) + before_stmt_pos= MY_OFF_T_UNDEF; + } + + void set_binlog_cache_info(ulong param_max_binlog_cache_size, + ulong *param_ptr_binlog_cache_use, + ulong *param_ptr_binlog_cache_disk_use) + { + /* + The assertions guarantee that the set_binlog_cache_info is + called just once and information passed as parameters are + never zero. + + This is done while calling the constructor binlog_cache_mngr. + We cannot set informaton in the constructor binlog_cache_data + because the space for binlog_cache_mngr is allocated through + a placement new. + + In the future, we can refactor this and change it to avoid + the set_binlog_info. + */ + DBUG_ASSERT(saved_max_binlog_cache_size == 0 && + param_max_binlog_cache_size != 0 && + ptr_binlog_cache_use == 0 && + param_ptr_binlog_cache_use != 0 && + ptr_binlog_cache_disk_use == 0 && + param_ptr_binlog_cache_disk_use != 0); + + saved_max_binlog_cache_size= param_max_binlog_cache_size; + ptr_binlog_cache_use= param_ptr_binlog_cache_use; + ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use; + cache_log.end_of_file= saved_max_binlog_cache_size; + } + + /* + Cache to store data before copying it to the binary log. */ - bool at_least_one_stmt_committed; - bool incident; + IO_CACHE cache_log; private: /* - Pending binrows event. This event is the event where the rows are - currently written. + Pending binrows event. This event is the event where the rows are currently + written. */ Rows_log_event *m_pending; -public: /* Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + + /* + This indicates that some events did not get into the cache and most likely + it is corrupted. + */ + bool incident; + + /* + This flag indicates if the cache has changes to temporary tables. + @TODO This a temporary fix and should be removed after BUG#54562. + */ + bool changes_to_non_trans_temp_table_flag; + + /** + This function computes binlog cache and disk usage. + */ + void compute_statistics() + { + if (!empty()) + { + statistic_increment(*ptr_binlog_cache_use, &LOCK_status); + if (cache_log.disk_writes != 0) + statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status); + } + } + + /* + Stores the values of maximum size of the cache allowed when this cache + is configured. This corresponds to either + . max_binlog_cache_size or max_binlog_stmt_cache_size. + */ + ulong saved_max_binlog_cache_size; + + /* + Stores a pointer to the status variable that keeps track of the in-memory + cache usage. This corresponds to either + . binlog_cache_use or binlog_stmt_cache_use. + */ + ulong *ptr_binlog_cache_use; + + /* + Stores a pointer to the status variable that keeps track of the disk + cache usage. This corresponds to either + . binlog_cache_disk_use or binlog_stmt_cache_disk_use. + */ + ulong *ptr_binlog_cache_disk_use; + + /* + It truncates the cache to a certain position. This includes deleting the + pending event. + */ + void truncate(my_off_t pos) + { + DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos)); + if (pending()) + { + delete pending(); + set_pending(0); + } + reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0); + cache_log.end_of_file= saved_max_binlog_cache_size; + } + + binlog_cache_data& operator=(const binlog_cache_data& info); + binlog_cache_data(const binlog_cache_data& info); +}; + +class binlog_cache_mngr { +public: + binlog_cache_mngr(ulong param_max_binlog_stmt_cache_size, + ulong param_max_binlog_cache_size, + ulong *param_ptr_binlog_stmt_cache_use, + ulong *param_ptr_binlog_stmt_cache_disk_use, + ulong *param_ptr_binlog_cache_use, + ulong *param_ptr_binlog_cache_disk_use) + : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) + { + stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, + param_ptr_binlog_stmt_cache_use, + param_ptr_binlog_stmt_cache_disk_use); + trx_cache.set_binlog_cache_info(param_max_binlog_cache_size, + param_ptr_binlog_cache_use, + param_ptr_binlog_cache_disk_use); + last_commit_pos_file[0]= 0; + } + + void reset(bool do_stmt, bool do_trx) + { + if (do_stmt) + stmt_cache.reset(); + if (do_trx) + { + trx_cache.reset(); + using_xa= FALSE; + last_commit_pos_file[0]= 0; + last_commit_pos_offset= 0; + } + } + + binlog_cache_data* get_binlog_cache_data(bool is_transactional) + { + return (is_transactional ? &trx_cache : &stmt_cache); + } + + IO_CACHE* get_binlog_cache_log(bool is_transactional) + { + return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log); + } + + binlog_cache_data stmt_cache; + + binlog_cache_data trx_cache; + /* Binlog position for current transaction. For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog @@ -298,6 +478,11 @@ public: */ bool using_xa; my_xid xa_xid; + +private: + + binlog_cache_mngr& operator=(const binlog_cache_mngr& info); + binlog_cache_mngr(const binlog_cache_mngr& info); }; handlerton *binlog_hton; @@ -317,8 +502,8 @@ bool LOGGER::is_log_table_enabled(uint log_table_type) /* Check if a given table is opened log table */ -int check_if_log_table(uint db_len, const char *db, uint table_name_len, - const char *table_name, uint check_if_opened) +int check_if_log_table(size_t db_len, const char *db, size_t table_name_len, + const char *table_name, bool check_if_opened) { if (db_len == 5 && !(lower_case_table_names ? @@ -410,7 +595,7 @@ bool Log_to_csv_event_handler:: bool need_rnd_end= FALSE; uint field_index; Silence_log_table_errors error_handler; - Open_tables_state open_tables_backup; + Open_tables_backup open_tables_backup; ulonglong save_thd_options; bool save_time_zone_used; DBUG_ENTER("log_general"); @@ -421,20 +606,16 @@ bool Log_to_csv_event_handler:: */ save_time_zone_used= thd->time_zone_used; - save_thd_options= thd->options; - thd->options&= ~OPTION_BIN_LOG; - - bzero(& table_list, sizeof(TABLE_LIST)); - table_list.alias= table_list.table_name= GENERAL_LOG_NAME.str; - table_list.table_name_length= GENERAL_LOG_NAME.length; - - table_list.lock_type= TL_WRITE_CONCURRENT_INSERT; + save_thd_options= thd->variables.option_bits; + thd->variables.option_bits&= ~OPTION_BIN_LOG; - table_list.db= MYSQL_SCHEMA_NAME.str; - table_list.db_length= MYSQL_SCHEMA_NAME.length; + table_list.init_one_table(MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length, + GENERAL_LOG_NAME.str, GENERAL_LOG_NAME.length, + GENERAL_LOG_NAME.str, + TL_WRITE_CONCURRENT_INSERT); /* - 1) open_performance_schema_table generates an error of the + 1) open_log_table generates an error of the table can not be opened or is corrupted. 2) "INSERT INTO general_log" can generate warning sometimes. @@ -447,8 +628,7 @@ bool Log_to_csv_event_handler:: thd->push_internal_handler(& error_handler); need_pop= TRUE; - if (!(table= open_performance_schema_table(thd, & table_list, - & open_tables_backup))) + if (!(table= open_log_table(thd, &table_list, &open_tables_backup))) goto err; need_close= TRUE; @@ -528,9 +708,9 @@ err: if (need_pop) thd->pop_internal_handler(); if (need_close) - close_performance_schema_table(thd, & open_tables_backup); + close_log_table(thd, &open_tables_backup); - thd->options= save_thd_options; + thd->variables.option_bits= save_thd_options; thd->time_zone_used= save_time_zone_used; DBUG_RETURN(result); } @@ -576,7 +756,7 @@ bool Log_to_csv_event_handler:: bool need_close= FALSE; bool need_rnd_end= FALSE; Silence_log_table_errors error_handler; - Open_tables_state open_tables_backup; + Open_tables_backup open_tables_backup; CHARSET_INFO *client_cs= thd->variables.character_set_client; bool save_time_zone_used; long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS); @@ -593,17 +773,12 @@ bool Log_to_csv_event_handler:: */ save_time_zone_used= thd->time_zone_used; - bzero(& table_list, sizeof(TABLE_LIST)); - table_list.alias= table_list.table_name= SLOW_LOG_NAME.str; - table_list.table_name_length= SLOW_LOG_NAME.length; + table_list.init_one_table(MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length, + SLOW_LOG_NAME.str, SLOW_LOG_NAME.length, + SLOW_LOG_NAME.str, + TL_WRITE_CONCURRENT_INSERT); - table_list.lock_type= TL_WRITE_CONCURRENT_INSERT; - - table_list.db= MYSQL_SCHEMA_NAME.str; - table_list.db_length= MYSQL_SCHEMA_NAME.length; - - if (!(table= open_performance_schema_table(thd, & table_list, - & open_tables_backup))) + if (!(table= open_log_table(thd, &table_list, &open_tables_backup))) goto err; need_close= TRUE; @@ -717,7 +892,7 @@ err: table->file->ha_release_auto_increment(); } if (need_close) - close_performance_schema_table(thd, & open_tables_backup); + close_log_table(thd, &open_tables_backup); thd->time_zone_used= save_time_zone_used; DBUG_RETURN(result); } @@ -727,36 +902,31 @@ int Log_to_csv_event_handler:: { TABLE_LIST table_list; TABLE *table; + LEX_STRING *UNINIT_VAR(log_name); int result; - Open_tables_state open_tables_backup; + Open_tables_backup open_tables_backup; DBUG_ENTER("Log_to_csv_event_handler::activate_log"); - bzero(& table_list, sizeof(TABLE_LIST)); - if (log_table_type == QUERY_LOG_GENERAL) { - table_list.alias= table_list.table_name= GENERAL_LOG_NAME.str; - table_list.table_name_length= GENERAL_LOG_NAME.length; + log_name= &GENERAL_LOG_NAME; } else { DBUG_ASSERT(log_table_type == QUERY_LOG_SLOW); - table_list.alias= table_list.table_name= SLOW_LOG_NAME.str; - table_list.table_name_length= SLOW_LOG_NAME.length; - } - - table_list.lock_type= TL_WRITE_CONCURRENT_INSERT; - table_list.db= MYSQL_SCHEMA_NAME.str; - table_list.db_length= MYSQL_SCHEMA_NAME.length; + log_name= &SLOW_LOG_NAME; + } + table_list.init_one_table(MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length, + log_name->str, log_name->length, log_name->str, + TL_WRITE_CONCURRENT_INSERT); - table= open_performance_schema_table(thd, & table_list, - & open_tables_backup); + table= open_log_table(thd, &table_list, &open_tables_backup); if (table) { result= 0; - close_performance_schema_table(thd, & open_tables_backup); + close_log_table(thd, &open_tables_backup); } else result= 1; @@ -833,10 +1003,10 @@ bool Log_to_file_event_handler::init() if (!is_initialized) { if (opt_slow_log) - mysql_slow_log.open_slow_log(sys_var_slow_log_path.value); + mysql_slow_log.open_slow_log(opt_slow_logname); if (opt_log) - mysql_log.open_query_log(sys_var_general_log_path.value); + mysql_log.open_query_log(opt_logname); is_initialized= TRUE; } @@ -860,13 +1030,6 @@ void Log_to_file_event_handler::flush() mysql_slow_log.reopen_file(); } -void Log_to_file_event_handler::flush_slow_log() -{ - /* reopen slow log file */ - if (opt_slow_log) - mysql_slow_log.reopen_file(); -} - /* Log error with all enabled log event handlers @@ -900,7 +1063,7 @@ bool LOGGER::error_log_print(enum loglevel level, const char *format, void LOGGER::cleanup_base() { DBUG_ASSERT(inited == 1); - rwlock_destroy(&LOCK_logger); + mysql_rwlock_destroy(&LOCK_logger); if (table_log_handler) { table_log_handler->cleanup(); @@ -945,7 +1108,7 @@ void LOGGER::init_base() init_error_log(LOG_FILE); file_log_handler->init_pthread_objects(); - my_rwlock_init(&LOCK_logger, NULL); + mysql_rwlock_init(key_rwlock_LOCK_logger, &LOCK_logger); } @@ -977,7 +1140,12 @@ bool LOGGER::flush_logs(THD *thd) } -bool LOGGER::flush_slow_log(THD *thd) +/** + Close and reopen the slow log (with locks). + + @returns FALSE. +*/ +bool LOGGER::flush_slow_log() { /* Now we lock logger, as nobody should be able to use logging routines while @@ -985,11 +1153,37 @@ bool LOGGER::flush_slow_log(THD *thd) */ logger.lock_exclusive(); - /* reopen log files */ - file_log_handler->flush_slow_log(); + /* Reopen slow log file */ + if (opt_slow_log) + file_log_handler->get_mysql_slow_log()->reopen_file(); - /* end of log flush */ + /* End of log flush */ + logger.unlock(); + + return 0; +} + + +/** + Close and reopen the general log (with locks). + + @returns FALSE. +*/ +bool LOGGER::flush_general_log() +{ + /* + Now we lock logger, as nobody should be able to use logging routines while + log tables are closed + */ + logger.lock_exclusive(); + + /* Reopen general log file */ + if (opt_log) + file_log_handler->get_mysql_log()->reopen_file(); + + /* End of log flush */ logger.unlock(); + return 0; } @@ -1079,7 +1273,6 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command, bool error= FALSE; Log_event_handler **current_handler= general_log_handler_list; char user_host_buff[MAX_USER_HOST_SIZE + 1]; - Security_context *sctx= thd->security_ctx; uint user_host_len= 0; my_hrtime_t current_time; @@ -1091,14 +1284,16 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command, unlock(); return 0; } - user_host_len= strxnmov(user_host_buff, MAX_USER_HOST_SIZE, - sctx->priv_user ? sctx->priv_user : "", "[", - sctx->user ? sctx->user : "", "] @ ", - sctx->host ? sctx->host : "", " [", - sctx->ip ? sctx->ip : "", "]", NullS) - - user_host_buff; + user_host_len= make_user_name(thd, user_host_buff); current_time= my_hrtime(); + + mysql_audit_general_log(thd, hrtime_to_time(current_time), + user_host_buff, user_host_len, + command_name[(uint) command].str, + command_name[(uint) command].length, + query, query_length); + while (*current_handler) error|= (*current_handler++)-> log_general(thd, current_time, user_host_buff, @@ -1128,7 +1323,7 @@ bool LOGGER::general_log_print(THD *thd, enum enum_server_command command, return general_log_write(thd, command, message_buff, message_buff_len); } -void LOGGER::init_error_log(uint error_log_printer) +void LOGGER::init_error_log(ulonglong error_log_printer) { if (error_log_printer & LOG_NONE) { @@ -1151,7 +1346,7 @@ void LOGGER::init_error_log(uint error_log_printer) } } -void LOGGER::init_slow_log(uint slow_log_printer) +void LOGGER::init_slow_log(ulonglong slow_log_printer) { if (slow_log_printer & LOG_NONE) { @@ -1176,7 +1371,7 @@ void LOGGER::init_slow_log(uint slow_log_printer) } } -void LOGGER::init_general_log(uint general_log_printer) +void LOGGER::init_general_log(ulonglong general_log_printer) { if (general_log_printer & LOG_NONE) { @@ -1213,7 +1408,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type) { file_log= file_log_handler->get_mysql_slow_log(); - file_log->open_slow_log(sys_var_slow_log_path.value); + file_log->open_slow_log(opt_slow_logname); if (table_log_handler->activate_log(thd, QUERY_LOG_SLOW)) { /* Error printed by open table in activate_log() */ @@ -1232,7 +1427,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type) { file_log= file_log_handler->get_mysql_log(); - file_log->open_query_log(sys_var_general_log_path.value); + file_log->open_query_log(opt_logname); if (table_log_handler->activate_log(thd, QUERY_LOG_GENERAL)) { /* Error printed by open table in activate_log() */ @@ -1289,9 +1484,9 @@ bool Log_to_csv_event_handler::init() return 0; } -int LOGGER::set_handlers(uint error_log_printer, - uint slow_log_printer, - uint general_log_printer) +int LOGGER::set_handlers(ulonglong error_log_printer, + ulonglong slow_log_printer, + ulonglong general_log_printer) { /* error log table is not supported yet */ DBUG_ASSERT(error_log_printer < LOG_TABLE); @@ -1317,26 +1512,6 @@ int LOGGER::set_handlers(uint error_log_printer, return 0; } -/** - This function checks if a transactional talbe was updated by the - current statement. - - @param thd The client thread that executed the current statement. - @return - @c true if a transactional table was updated, @false otherwise. -*/ -static bool stmt_has_updated_trans_table(THD *thd) -{ - Ha_trx_info *ha_info; - - for (ha_info= thd->transaction.stmt.ha_list; ha_info && ha_info->is_started(); ha_info= ha_info->next()) - { - if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) - return (TRUE); - } - return (FALSE); -} - /* Save position of binary log transaction cache. @@ -1359,10 +1534,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) DBUG_ASSERT(pos != NULL); if (thd_get_ha_data(thd, binlog_hton) == NULL) thd->binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_ASSERT(mysql_bin_log.is_open()); - *pos= trx_data->position(); + *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; } @@ -1393,9 +1568,9 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) /* Only true if binlog_trans_log_savepos() wasn't called before */ DBUG_ASSERT(pos != ~(my_off_t) 0); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - trx_data->truncate(pos); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + cache_mngr->trx_cache.restore_savepoint(pos); DBUG_VOID_RETURN; } @@ -1425,130 +1600,214 @@ int binlog_init(void *p) static int binlog_close_connection(handlerton *hton, THD *thd) { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - DBUG_ASSERT(trx_data->empty()); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty()); thd_set_ha_data(thd, binlog_hton, NULL); - trx_data->~binlog_trx_data(); - my_free((uchar*)trx_data, MYF(0)); + cache_mngr->~binlog_cache_mngr(); + my_free(cache_mngr); return 0; } /* - End a transaction, writing events to the binary log. + This function flushes a cache upon commit/rollback. SYNOPSIS - binlog_flush_trx_cache() + binlog_flush_cache() - thd The thread whose transaction should be ended - trx_data Pointer to the transaction data to use - all True if the entire transaction should be ended, false if - only the statement transaction should be ended. - end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) + thd The thread whose transaction should be ended + cache_mngr Pointer to the binlog_cache_mngr to use + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) + using_stmt True if the statement cache should be flushed + using_trx True if the transaction cache should be flushed DESCRIPTION - End the currently open transaction. The transaction can be either + End the currently transaction or statement. The transaction can be either a real transaction or a statement transaction. This can be to commit a transaction, with a COMMIT query event or an XA commit XID event. But it can also be to rollback a transaction with a ROLLBACK query event, used for rolling back transactions which also - contain updates to non-transactional tables. + contain updates to non-transactional tables. Or it can be a flush of + a statement cache. */ static int -binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, bool using_stmt, + bool using_trx) { - DBUG_ENTER("binlog_flush_trx_cache"); - IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("info", ("thd->options={ %s%s}", - FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->options, OPTION_BEGIN))); - - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, - end_ev, all); - - trx_data->reset(); + int error= 0; + DBUG_ENTER("binlog_flush_cache"); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) + if ((using_stmt && !cache_mngr->stmt_cache.empty()) || + (using_trx && !cache_mngr->trx_cache.empty())) { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; + if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) + DBUG_RETURN(1); + if (using_trx && thd->binlog_flush_pending_rows_event(TRUE, TRUE)) + DBUG_RETURN(1); + + /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + error= mysql_bin_log.write_transaction_to_binlog(thd, cache_mngr, + end_ev, all, + using_stmt, using_trx); } + cache_mngr->reset(using_stmt, using_trx); - DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) && + (!using_trx || cache_mngr->trx_cache.empty())); DBUG_RETURN(error); } -/* - Discard a transaction, ie. ROLLBACK with only transactional table updates. - SYNOPSIS - binlog_truncate_trx_cache() +/** + This function flushes the stmt-cache upon commit. - thd The thread whose transaction should be ended - trx_data Pointer to the transaction data to use - all True if the entire transaction should be ended, false if - only the statement transaction should be ended. + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache manager - DESCRIPTION + @return + nonzero if an error pops up when flushing the cache. +*/ +static inline int +binlog_commit_flush_stmt_cache(THD *thd, bool all, + binlog_cache_mngr *cache_mngr) +{ + Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), + FALSE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); +} - Rollback (and end) a transaction that only modifies transactional - tables. The transaction can be either a real transaction (if 'all' is - true) or a statement transaction (if 'all' is false). +/** + This function flushes the trx-cache upon commit. - The transaction cache will be truncated to either just before the last - opened statement transaction (if 'all' is false), or reset completely (if - 'all' is true). - */ + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache manager + + @return + nonzero if an error pops up when flushing the cache. +*/ +static inline int +binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) +{ + Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), + TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); +} + +/** + This function flushes the trx-cache upon rollback. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache manager + + @return + nonzero if an error pops up when flushing the cache. +*/ +static inline int +binlog_rollback_flush_trx_cache(THD *thd, bool all, + binlog_cache_mngr *cache_mngr) +{ + Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"), + TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); +} + +/** + This function flushes the trx-cache upon commit. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache manager + @param xid Transaction Id + + @return + nonzero if an error pops up when flushing the cache. +*/ +static inline int +binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, + bool all, my_xid xid) +{ + if (xid) + { + Xid_log_event end_evt(thd, xid, TRUE); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); + } + else + { + /* + Empty xid occurs in XA COMMIT ... ONE PHASE. + In this case, we do not have a MySQL xid for the transaction, and the + external XA transaction coordinator will have to handle recovery if + needed. So we end the transaction with a plain COMMIT query event. + */ + Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), + TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); + } +} + +/** + This function truncates the transactional cache upon committing or rolling + back either a transaction or a statement. + + @param thd The thread whose transaction should be flushed + @param cache_mngr Pointer to the cache data to be flushed + @param all @c true means truncate the transaction, otherwise the + statement must be truncated. + + @return + nonzero if an error pops up when truncating the transactional cache. +*/ static int -binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all) +binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { DBUG_ENTER("binlog_truncate_trx_cache"); - int error= 0; - DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt")); - DBUG_PRINT("info", ("thd->options={ %s%s}", - FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->options, OPTION_BEGIN))); - + int error=0; /* - ROLLBACK with nothing to replicate: i.e., rollback of only transactional - tables. + This function handles transactional changes and as such this flag + equals to true. */ + bool const is_transactional= TRUE; + + DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s", + FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), + all ? "all" : "stmt")); + thd->binlog_remove_pending_rows_event(TRUE, is_transactional); /* If rolling back an entire transaction or a single statement not inside a transaction, we reset the transaction cache. - - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + */ + if (ending_trans(thd, all)) { - if (trx_data->has_incident()) + if (cache_mngr->trx_cache.has_incident()) error= mysql_bin_log.write_incident(thd); - trx_data->reset(); + + thd->clear_binlog_table_maps(); + + cache_mngr->reset(false, true); } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); + /* + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ + else + cache_mngr->trx_cache.restore_prev_position(); - DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL); DBUG_RETURN(error); } @@ -1566,8 +1825,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) /** This function is called once after each statement. - It has the responsibility to flush the transaction cache to the - binlog file on commits. + It has the responsibility to flush the caches to the binary log on commits. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1580,53 +1838,50 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) { int error= 0; DBUG_ENTER("binlog_commit"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - if (trx_data->empty()) - { - // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() - trx_data->reset(); - DBUG_RETURN(0); - } - - /* - We flush the cache if: - - - we are committing a transaction or; - - no statement was committed before and just non-transactional - tables were updated. + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - Otherwise, we collect the changes. - */ DBUG_PRINT("debug", - ("all: %d, empty: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", + ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", all, - YESNO(trx_data->empty()), + YESNO(thd->in_multi_stmt_transaction_mode()), YESNO(thd->transaction.all.modified_non_trans_table), YESNO(thd->transaction.stmt.modified_non_trans_table))); - if (ending_trans(thd, all) || - (trans_has_no_stmt_committed(thd, all) && - !stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd))) + + if (!cache_mngr->stmt_cache.empty()) + { + error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); + } + + if (cache_mngr->trx_cache.empty()) { - Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset(false, true); + DBUG_RETURN(error); } - trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; + /* + We commit the transaction if: + - We are not in a transaction and committing a statement, or + - We are in a transaction and a full transaction is committed. + Otherwise, we accumulate the changes. + */ + if (!error && ending_trans(thd, all)) + error= binlog_commit_flush_trx_cache(thd, all, cache_mngr); + /* + This is part of the stmt rollback. + */ if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt commit + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); + DBUG_RETURN(error); } /** - This function is called when a transaction involving a transactional - table is rolled back. - - It has the responsibility to flush the transaction cache to the - binlog file. However, if the transaction does not involve - non-transactional tables, nothing needs to be logged. + This function is called when a transaction or a statement is rolled back. @param hton The binlog handlerton. @param thd The client thread that executes the transaction. @@ -1638,19 +1893,38 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); - int error=0; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - if (trx_data->empty()) { - trx_data->reset(); - DBUG_RETURN(0); - } + int error= 0; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), YESNO(thd->transaction.all.modified_non_trans_table), YESNO(thd->transaction.stmt.modified_non_trans_table))); + + /* + If an incident event is set we do not flush the content of the statement + cache because it may be corrupted. + */ + if (cache_mngr->stmt_cache.has_incident()) + { + error= mysql_bin_log.write_incident(thd); + cache_mngr->reset(true, false); + } + else if (!cache_mngr->stmt_cache.empty()) + { + error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); + } + + if (cache_mngr->trx_cache.empty()) + { + /* + we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() + */ + cache_mngr->reset(false, true); + DBUG_RETURN(error); + } + if (mysql_bin_log.check_write_error(thd)) { /* @@ -1661,68 +1935,61 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ DBUG_ASSERT(!all); /* - We reach this point if either only transactional tables were modified or - the effect of a statement that did not get into the binlog needs to be - rolled back. In the latter case, if a statement changed non-transactional - tables or had the OPTION_KEEP_LOG associated, we write an incident event - to the binlog in order to stop slaves and notify users that some changes - on the master did not get into the binlog and slaves will be inconsistent. - On the other hand, if a statement is transactional, we just safely roll it - back. + We reach this point if the effect of a statement did not properly get into + a cache and need to be rolled back. */ - if ((stmt_has_updated_non_trans_table(thd) || - (thd->options & OPTION_KEEP_LOG)) && - mysql_bin_log.check_write_error(thd)) - trx_data->set_incident(); - error= binlog_truncate_trx_cache(thd, trx_data, all); + error |= binlog_truncate_trx_cache(thd, cache_mngr, all); } - else - { - /* - We flush the cache with a rollback, wrapped in a begin/rollback if: - . aborting a transaction that modified a non-transactional table or - the OPTION_KEEP_LOG is activate. - . aborting a statement that modified both transactional and - non-transactional tables but which is not in the boundaries of any - transaction or there was no early change; + else if (!error) + { + /* + We flush the cache wrapped in a beging/rollback if: + . aborting a single or multi-statement transaction and; + . the OPTION_KEEP_LOG is active or; + . the format is STMT and a non-trans table was updated or; + . the format is MIXED and a temporary non-trans table was + updated or; + . the format is MIXED, non-trans table was updated and + aborting a single statement transaction; */ - if ((ending_trans(thd, all) && - (trans_has_updated_non_trans_table(thd) || - (thd->options & OPTION_KEEP_LOG))) || - (trans_has_no_stmt_committed(thd, all) && - stmt_has_updated_non_trans_table(thd) && - thd->current_stmt_binlog_row_based)) - { - Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); - } + if (ending_trans(thd, all) && + ((thd->variables.option_bits & OPTION_KEEP_LOG) || + (trans_has_updated_non_trans_table(thd) && + thd->variables.binlog_format == BINLOG_FORMAT_STMT) || + (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && + thd->variables.binlog_format == BINLOG_FORMAT_MIXED) || + (trans_has_updated_non_trans_table(thd) && + ending_single_stmt_trans(thd,all) && + thd->variables.binlog_format == BINLOG_FORMAT_MIXED))) + error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* - Otherwise, we simply truncate the cache as there is no change on - non-transactional tables as follows. + Truncate the cache if: + . aborting a single or multi-statement transaction or; + . the OPTION_KEEP_LOG is not active and; + . the format is not STMT or no non-trans table was + updated and; + . the format is not MIXED or no temporary non-trans table + was updated. */ else if (ending_trans(thd, all) || - (!(thd->options & OPTION_KEEP_LOG) && !stmt_has_updated_non_trans_table(thd))) - error= binlog_truncate_trx_cache(thd, trx_data, all); + (!(thd->variables.option_bits & OPTION_KEEP_LOG) && + (!stmt_has_updated_non_trans_table(thd) || + thd->variables.binlog_format != BINLOG_FORMAT_STMT) && + (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() || + thd->variables.binlog_format != BINLOG_FORMAT_MIXED))) + error= binlog_truncate_trx_cache(thd, cache_mngr, all); } - if (!all) - trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback - DBUG_RETURN(error); -} -/** - Cleanup the cache. - - @param thd The client thread that wants to clean up the cache. -*/ -void MYSQL_BIN_LOG::reset_gathered_updates(THD *thd) -{ - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + /* + This is part of the stmt rollback. + */ + if (!all) + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); - trx_data->reset(); + DBUG_RETURN(error); } -void MYSQL_BIN_LOG::set_write_error(THD *thd) +void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::set_write_error"); @@ -1732,9 +1999,20 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd) DBUG_VOID_RETURN; if (my_errno == EFBIG) - my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME)); + { + if (is_transactional) + { + my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME)); + } + else + { + my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME)); + } + } else + { my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name, errno); + } DBUG_VOID_RETURN; } @@ -1748,9 +2026,10 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd) if (!thd->is_error()) DBUG_RETURN(checked); - switch (thd->main_da.sql_errno()) + switch (thd->stmt_da->sql_errno()) { case ER_TRANS_CACHE_FULL: + case ER_STMT_CACHE_FULL: case ER_ERROR_ON_WRITE: case ER_BINLOG_LOGGING_IMPOSSIBLE: checked= TRUE; @@ -1800,7 +2079,7 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) DBUG_RETURN(1); int errcode= query_error_code(thd, thd->killed == NOT_KILLED); Query_log_event qinfo(thd, log_query.ptr(), log_query.length(), - TRUE, TRUE, errcode); + TRUE, FALSE, TRUE, errcode); DBUG_RETURN(mysql_bin_log.write(&qinfo)); } @@ -1813,8 +2092,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ - if (unlikely(trans_has_updated_non_trans_table(thd) || - (thd->options & OPTION_KEEP_LOG))) + if (unlikely(trans_has_updated_non_trans_table(thd) || + (thd->variables.option_bits & OPTION_KEEP_LOG))) { String log_query; if (log_query.append(STRING_WITH_LEN("ROLLBACK TO ")) || @@ -1824,7 +2103,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) DBUG_RETURN(1); int errcode= query_error_code(thd, thd->killed == NOT_KILLED); Query_log_event qinfo(thd, log_query.ptr(), log_query.length(), - TRUE, TRUE, errcode); + TRUE, FALSE, TRUE, errcode); DBUG_RETURN(mysql_bin_log.write(&qinfo)); } binlog_trans_log_truncate(thd, *(my_off_t*)sv); @@ -1858,8 +2137,9 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) File file; DBUG_ENTER("open_binlog"); - if ((file = my_open(log_file_name, O_RDONLY | O_BINARY | O_SHARE, - MYF(MY_WME))) < 0) + if ((file= mysql_file_open(key_file_binlog, + log_file_name, O_RDONLY | O_BINARY | O_SHARE, + MYF(MY_WME))) < 0) { sql_print_error("Failed to open log (file '%s', errno %d)", log_file_name, my_errno); @@ -1881,13 +2161,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) err: if (file >= 0) { - my_close(file,MYF(0)); + mysql_file_close(file, MYF(0)); end_io_cache(log); } DBUG_RETURN(-1); } -#ifdef __NT__ +#ifdef _WIN32 static int eventSource = 0; static void setup_windows_event_source() @@ -1922,28 +2202,33 @@ static void setup_windows_event_source() RegCloseKey(hRegKey); } -#endif /* __NT__ */ +#endif /* _WIN32 */ /** Find a unique filename for 'filename.#'. - Set '#' to a number as low as possible. + Set '#' to the number next to the maximum found in the most + recent log file extension. + + This function will return nonzero if: (i) the generated name + exceeds FN_REFLEN; (ii) if the number of extensions is exhausted; + or (iii) some other error happened while examining the filesystem. @return - nonzero if not possible to get unique filename + nonzero if not possible to get unique filename. */ static int find_uniq_filename(char *name) { - long number; uint i; - char buff[FN_REFLEN]; + char buff[FN_REFLEN], ext_buf[FN_REFLEN]; struct st_my_dir *dir_info; reg1 struct fileinfo *file_info; - ulong max_found=0; + ulong max_found= 0, next= 0, number= 0; size_t buf_length, length; char *start, *end; + int error= 0; DBUG_ENTER("find_uniq_filename"); LINT_INIT(number); @@ -1952,16 +2237,16 @@ static int find_uniq_filename(char *name) end= strend(start); *end='.'; - length= (size_t) (end-start+1); + length= (size_t) (end - start + 1); if ((DBUG_EVALUATE_IF("error_unique_log_filename", 1, - !(dir_info = my_dir(buff,MYF(MY_DONT_SORT)))))) + !(dir_info= my_dir(buff,MYF(MY_DONT_SORT)))))) { // This shouldn't happen strmov(end,".1"); // use name+1 DBUG_RETURN(1); } file_info= dir_info->dir_entry; - for (i=dir_info->number_off_files ; i-- ; file_info++) + for (i= dir_info->number_off_files ; i-- ; file_info++) { if (memcmp(file_info->name, start, length) == 0 && test_if_number(file_info->name+length, &number,0)) @@ -1971,8 +2256,52 @@ static int find_uniq_filename(char *name) } my_dirend(dir_info); + /* check if reached the maximum possible extension number */ + if ((max_found == MAX_LOG_UNIQUE_FN_EXT)) + { + sql_print_error("Log filename extension number exhausted: %06lu. \ +Please fix this by archiving old logs and \ +updating the index files.", max_found); + error= 1; + goto end; + } + + next= max_found + 1; + if (sprintf(ext_buf, "%06lu", next)<0) + { + error= 1; + goto end; + } *end++='.'; - DBUG_RETURN((sprintf(end,"%06ld",max_found+1) < 0)); + + /* + Check if the generated extension size + the file name exceeds the + buffer size used. If one did not check this, then the filename might be + truncated, resulting in error. + */ + if (((strlen(ext_buf) + (end - name)) >= FN_REFLEN)) + { + sql_print_error("Log filename too large: %s%s (%zu). \ +Please fix this by archiving old logs and updating the \ +index files.", name, ext_buf, (strlen(ext_buf) + (end - name))); + error= 1; + goto end; + } + + if (sprintf(end, "%06lu", next)<0) + { + error= 1; + goto end; + } + + /* print warning if reaching the end of available extensions. */ + if ((next > (MAX_LOG_UNIQUE_FN_EXT - LOG_WARN_UNIQUE_FN_EXT_LEFT))) + sql_print_warning("Next log extension: %lu. \ +Remaining log filename extensions: %lu. \ +Please consider archiving some logs.", next, (MAX_LOG_UNIQUE_FN_EXT - next)); + +end: + DBUG_RETURN(error); } @@ -2024,7 +2353,11 @@ bool MYSQL_LOG::init_and_set_log_file_name(const char *log_name, 1 error */ -bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, +bool MYSQL_LOG::open( +#ifdef HAVE_PSI_INTERFACE + PSI_file_key log_file_key, +#endif + const char *log_name, enum_log_type log_type_arg, const char *new_name, enum cache_type io_cache_type_arg) { char buff[FN_REFLEN]; @@ -2052,10 +2385,16 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, db[0]= 0; - if ((file= my_open(log_file_name, open_flags, - MYF(MY_WME | ME_WAITTANG))) < 0 || +#ifdef HAVE_PSI_INTERFACE + /* Keep the key for reopen */ + m_log_file_key= log_file_key; +#endif + + if ((file= mysql_file_open(log_file_key, + log_file_name, open_flags, + MYF(MY_WME | ME_WAITTANG))) < 0 || init_io_cache(&log_file, file, IO_SIZE, io_cache_type, - my_tell(file, MYF(MY_WME)), 0, + mysql_file_tell(file, MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP | ((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0)))) goto err; @@ -2067,7 +2406,7 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, #ifdef EMBEDDED_LIBRARY "embedded library\n", my_progname, server_version, MYSQL_COMPILATION_COMMENT -#elif __NT__ +#elif _WIN32 "started with:\nTCP Port: %d, Named Pipe: %s\n", my_progname, server_version, MYSQL_COMPILATION_COMMENT, mysqld_port, mysqld_unix_port @@ -2093,9 +2432,10 @@ Turning logging off for the whole duration of the MySQL server process. \ To turn it on again: fix the cause, \ shutdown the MySQL server and restart it.", name, errno); if (file >= 0) - my_close(file, MYF(0)); + mysql_file_close(file, MYF(0)); end_io_cache(&log_file); - safeFree(name); + my_free(name); + name= NULL; log_state= LOG_CLOSED; DBUG_RETURN(1); } @@ -2117,7 +2457,7 @@ void MYSQL_LOG::init_pthread_objects() { DBUG_ASSERT(inited == 0); inited= 1; - (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW); + mysql_mutex_init(key_LOG_LOCK_log, &LOCK_log, MY_MUTEX_INIT_SLOW); } /* @@ -2142,13 +2482,13 @@ void MYSQL_LOG::close(uint exiting) { end_io_cache(&log_file); - if (my_sync(log_file.file, MYF(MY_WME)) && ! write_error) + if (mysql_file_sync(log_file.file, MYF(MY_WME)) && ! write_error) { write_error= 1; sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); } - if (my_close(log_file.file, MYF(MY_WME)) && ! write_error) + if (mysql_file_close(log_file.file, MYF(MY_WME)) && ! write_error) { write_error= 1; sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); @@ -2156,7 +2496,8 @@ void MYSQL_LOG::close(uint exiting) } log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED; - safeFree(name); + my_free(name); + name= NULL; DBUG_VOID_RETURN; } @@ -2168,7 +2509,7 @@ void MYSQL_LOG::cleanup() if (inited) { inited= 0; - (void) pthread_mutex_destroy(&LOCK_log); + mysql_mutex_destroy(&LOCK_log); close(0); } DBUG_VOID_RETURN; @@ -2218,7 +2559,7 @@ void MYSQL_QUERY_LOG::reopen_file() DBUG_VOID_RETURN; } - pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); save_name= name; name= 0; // Don't free name @@ -2228,10 +2569,14 @@ void MYSQL_QUERY_LOG::reopen_file() Note that at this point, log_state != LOG_CLOSED (important for is_open()). */ - open(save_name, log_type, 0, io_cache_type); - my_free(save_name, MYF(0)); + open( +#ifdef HAVE_PSI_INTERFACE + m_log_file_key, +#endif + save_name, log_type, 0, io_cache_type); + my_free(save_name); - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); DBUG_VOID_RETURN; } @@ -2273,7 +2618,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host, struct tm start; uint time_buff_len= 0; - (void) pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); /* Test if someone closed between the is_open test and lock */ if (is_open()) @@ -2322,7 +2667,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host, goto err; } - (void) pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); return FALSE; err: @@ -2331,7 +2676,7 @@ err: write_error= 1; sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); } - (void) pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); return TRUE; } @@ -2373,11 +2718,11 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, bool error= 0; DBUG_ENTER("MYSQL_QUERY_LOG::write"); - (void) pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); if (!is_open()) { - (void) pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(0); } @@ -2506,7 +2851,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, } } } - (void) pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -2539,12 +2884,13 @@ const char *MYSQL_LOG::generate_name(const char *log_name, -MYSQL_BIN_LOG::MYSQL_BIN_LOG() +MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), - is_relay_log(0), + sync_period_ptr(sync_period), + is_relay_log(0), signal_cnt(0), checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), description_event_for_exec(0), description_event_for_queue(0) @@ -2571,9 +2917,9 @@ void MYSQL_BIN_LOG::cleanup() close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); delete description_event_for_queue; delete description_event_for_exec; - (void) pthread_mutex_destroy(&LOCK_log); - (void) pthread_mutex_destroy(&LOCK_index); - (void) pthread_cond_destroy(&update_cond); + mysql_mutex_destroy(&LOCK_log); + mysql_mutex_destroy(&LOCK_index); + mysql_cond_destroy(&update_cond); } DBUG_VOID_RETURN; } @@ -2592,17 +2938,11 @@ void MYSQL_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg) void MYSQL_BIN_LOG::init_pthread_objects() { - DBUG_ASSERT(inited == 0); - inited= 1; - (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW); - /* - LOCK_index and LOCK_log are taken in wrong order - Can be seen with 'mysql-test-run ndb.ndb_binlog_basic' - */ - (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", - MYF_NO_DEADLOCK_DETECTION); - (void) pthread_cond_init(&update_cond, 0); - (void) pthread_cond_init(&COND_queue_busy, 0); + MYSQL_LOG::init_pthread_objects(); + mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); + mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION); + mysql_cond_init(m_key_update_cond, &update_cond, 0); + mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0); } @@ -2625,23 +2965,25 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, } fn_format(index_file_name, index_file_name_arg, mysql_data_home, ".index", opt); - if ((index_file_nr= my_open(index_file_name, - O_RDWR | O_CREAT | O_BINARY , - MYF(MY_WME))) < 0 || - my_sync(index_file_nr, MYF(MY_WME)) || + if ((index_file_nr= mysql_file_open(m_key_file_log_index, + index_file_name, + O_RDWR | O_CREAT | O_BINARY, + MYF(MY_WME))) < 0 || + mysql_file_sync(index_file_nr, MYF(MY_WME)) || init_io_cache(&index_file, index_file_nr, IO_SIZE, WRITE_CACHE, - my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), - 0, MYF(MY_WME | MY_WAIT_IF_FULL)) || + mysql_file_seek(index_file_nr, 0L, MY_SEEK_END, MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL)) || DBUG_EVALUATE_IF("fault_injection_openning_index", 1, 0)) { /* TODO: all operations creating/deleting the index file or a log, should call my_sync_dir() or my_sync_dir_by_file() to be durable. - TODO: file creation should be done with my_create() not my_open(). + TODO: file creation should be done with mysql_file_create() + not mysql_file_open(). */ if (index_file_nr >= 0) - my_close(index_file_nr,MYF(0)); + mysql_file_close(index_file_nr, MYF(0)); return TRUE; } @@ -2737,8 +3079,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name, write_error= 0; /* open the main log file */ - if (MYSQL_LOG::open(log_name, log_type_arg, new_name, - io_cache_type_arg)) + if (MYSQL_LOG::open( +#ifdef HAVE_PSI_INTERFACE + m_key_file_log, +#endif + log_name, log_type_arg, new_name, io_cache_type_arg)) { #ifdef HAVE_REPLICATION close_purge_index_file(); @@ -2798,7 +3143,6 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (!s.is_valid()) goto err; s.dont_set_created= null_created_arg; - s.pre_55_writing_direct(); if (s.write(&log_file)) goto err; bytes_written+= s.data_written; @@ -2830,19 +3174,18 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Don't set log_pos in event header */ description_event_for_queue->set_artificial_event(); - description_event_for_queue->pre_55_writing_direct(); if (description_event_for_queue->write(&log_file)) goto err; bytes_written+= description_event_for_queue->data_written; } if (flush_io_cache(&log_file) || - my_sync(log_file.file, MYF(MY_WME))) + mysql_file_sync(log_file.file, MYF(MY_WME))) goto err; - pthread_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_commit_ordered); strmake(last_commit_pos_file, log_file_name, sizeof(last_commit_pos_file)-1); last_commit_pos_offset= my_b_tell(&log_file); - pthread_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); if (write_file_name_to_index_file) { @@ -2862,7 +3205,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, strlen(log_file_name)) || my_b_write(&index_file, (uchar*) "\n", 1) || flush_io_cache(&index_file) || - my_sync(index_file.file, MYF(MY_WME))) + mysql_file_sync(index_file.file, MYF(MY_WME))) goto err; #ifdef HAVE_REPLICATION @@ -2889,20 +3232,17 @@ Turning logging off for the whole duration of the MySQL server process. \ To turn it on again: fix the cause, \ shutdown the MySQL server and restart it.", name, errno); if (file >= 0) - my_close(file,MYF(0)); - end_io_cache(&log_file); - end_io_cache(&index_file); - safeFree(name); - log_state= LOG_CLOSED; + mysql_file_close(file, MYF(0)); + close(LOG_CLOSE_INDEX); DBUG_RETURN(1); } int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo) { - pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); int ret = raw_get_current_log(linfo); - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); return ret; } @@ -2942,19 +3282,20 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset) for (;; offset+= bytes_read) { - (void) my_seek(file, offset, MY_SEEK_SET, MYF(0)); - if ((bytes_read= (int) my_read(file, io_buf, sizeof(io_buf), MYF(MY_WME))) + mysql_file_seek(file, offset, MY_SEEK_SET, MYF(0)); + if ((bytes_read= (int) mysql_file_read(file, io_buf, sizeof(io_buf), + MYF(MY_WME))) < 0) goto err; if (!bytes_read) break; // end of file - (void) my_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0)); - if (my_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP))) + mysql_file_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0)); + if (mysql_file_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP))) goto err; } /* The following will either truncate the file or fill the end with \n' */ - if (my_chsize(file, offset - init_offset, '\n', MYF(MY_WME)) || - my_sync(file, MYF(MY_WME))) + if (mysql_file_chsize(file, offset - init_offset, '\n', MYF(MY_WME)) || + mysql_file_sync(file, MYF(MY_WME))) goto err; /* Reset data in old index cache */ @@ -2993,18 +3334,33 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, bool need_lock) { int error= 0; - char *fname= linfo->log_file_name; - uint log_name_len= log_name ? (uint) strlen(log_name) : 0; + char *full_fname= linfo->log_file_name; + char full_log_name[FN_REFLEN], fname[FN_REFLEN]; + uint log_name_len= 0, fname_len= 0; DBUG_ENTER("find_log_pos"); - DBUG_PRINT("enter",("log_name: %s", log_name ? log_name : "NULL")); + full_log_name[0]= full_fname[0]= 0; /* Mutex needed because we need to make sure the file pointer does not move from under our feet */ if (need_lock) - pthread_mutex_lock(&LOCK_index); - safe_mutex_assert_owner(&LOCK_index); + mysql_mutex_lock(&LOCK_index); + mysql_mutex_assert_owner(&LOCK_index); + + // extend relative paths for log_name to be searched + if (log_name) + { + if(normalize_binlog_name(full_log_name, log_name, is_relay_log)) + { + error= LOG_INFO_EOF; + goto end; + } + } + + log_name_len= log_name ? (uint) strlen(full_log_name) : 0; + DBUG_PRINT("enter", ("log_name: %s, full_log_name: %s", + log_name ? log_name : "NULL", full_log_name)); /* As the file is flushed, we can't get an error here */ (void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0); @@ -3013,8 +3369,10 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, { uint length; my_off_t offset= my_b_tell(&index_file); - /* If we get 0 or 1 characters, this is the end of the file */ + DBUG_EXECUTE_IF("simulate_find_log_pos_error", + error= LOG_INFO_EOF; break;); + /* If we get 0 or 1 characters, this is the end of the file */ if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1) { /* Did not find the given entry; Return not found or error */ @@ -3022,21 +3380,30 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, break; } + // extend relative paths and match against full path + if (normalize_binlog_name(full_fname, fname, is_relay_log)) + { + error= LOG_INFO_EOF; + break; + } + fname_len= (uint) strlen(full_fname); + // if the log entry matches, null string matching anything if (!log_name || - (log_name_len == length-1 && fname[log_name_len] == '\n' && - !memcmp(fname, log_name, log_name_len))) + (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' && + !memcmp(full_fname, full_log_name, log_name_len))) { - DBUG_PRINT("info",("Found log file entry")); - fname[length-1]=0; // remove last \n + DBUG_PRINT("info", ("Found log file entry")); + full_fname[fname_len-1]= 0; // remove last \n linfo->index_file_start_offset= offset; linfo->index_file_offset = my_b_tell(&index_file); break; } } +end: if (need_lock) - pthread_mutex_unlock(&LOCK_index); + mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } @@ -3069,11 +3436,12 @@ int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) { int error= 0; uint length; - char *fname= linfo->log_file_name; + char fname[FN_REFLEN]; + char *full_fname= linfo->log_file_name; if (need_lock) - pthread_mutex_lock(&LOCK_index); - safe_mutex_assert_owner(&LOCK_index); + mysql_mutex_lock(&LOCK_index); + mysql_mutex_assert_owner(&LOCK_index); /* As the file is flushed, we can't get an error here */ (void) reinit_io_cache(&index_file, READ_CACHE, linfo->index_file_offset, 0, @@ -3085,12 +3453,23 @@ int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO; goto err; } - fname[length-1]=0; // kill \n - linfo->index_file_offset = my_b_tell(&index_file); + + if (fname[0] != 0) + { + if(normalize_binlog_name(full_fname, fname, is_relay_log)) + { + error= LOG_INFO_EOF; + goto err; + } + length= strlen(full_fname); + } + + full_fname[length-1]= 0; // kill \n + linfo->index_file_offset= my_b_tell(&index_file); err: if (need_lock) - pthread_mutex_unlock(&LOCK_index); + mysql_mutex_unlock(&LOCK_index); return error; } @@ -3116,6 +3495,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) { LOG_INFO linfo; bool error=0; + int err; const char* save_name; DBUG_ENTER("reset_logs"); @@ -3124,8 +3504,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) We need to get both locks to be sure that no one is trying to write to the index log file. */ - pthread_mutex_lock(&LOCK_log); - pthread_mutex_lock(&LOCK_index); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_index); /* The following mutex is needed to ensure that no threads call @@ -3133,7 +3513,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) thread. If the transaction involved MyISAM tables, it should go into binlog even on rollback. */ - pthread_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_thread_count); /* Save variables so that we can reopen the log */ save_name=name; @@ -3149,9 +3529,13 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) We need to invert the steps and use the purge_index_file methods in order to make the operation safe. */ - if (find_log_pos(&linfo, NullS, 0)) + + if ((err= find_log_pos(&linfo, NullS, 0)) != 0) { - error=1; + uint errcode= purge_log_get_error_code(err); + sql_print_error("Failed to locate old binlog or relay log files"); + my_message(errcode, ER(errcode), MYF(0)); + error= 1; goto err; } @@ -3218,12 +3602,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) if (!open_index_file(index_file_name, 0, FALSE)) if ((error= open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0, FALSE))) goto err; - my_free((uchar*) save_name, MYF(0)); + my_free((void *) save_name); err: - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - pthread_mutex_unlock(&LOCK_index); - pthread_mutex_unlock(&LOCK_log); + if (error == 1) + name= const_cast<char*>(save_name); + mysql_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_index); + mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -3277,7 +3663,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) DBUG_ASSERT(rli->slave_running == 1); DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name)); - pthread_mutex_lock(&LOCK_index); + mysql_mutex_lock(&LOCK_index); to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0)); /* @@ -3321,19 +3707,19 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE();); - pthread_mutex_lock(&rli->log_space_lock); + mysql_mutex_lock(&rli->log_space_lock); rli->relay_log.purge_logs(to_purge_if_included, included, 0, 0, &rli->log_space_total); // Tell the I/O thread to take the relay_log_space_limit into account rli->ignore_log_space_limit= 0; - pthread_mutex_unlock(&rli->log_space_lock); + mysql_mutex_unlock(&rli->log_space_lock); /* Ok to broadcast after the critical region as there is no risk of the mutex being destroyed by this thread later - this helps save context switches */ - pthread_cond_broadcast(&rli->log_space_cond); + mysql_cond_broadcast(&rli->log_space_cond); /* * Need to update the log pos because purge logs has been called @@ -3354,8 +3740,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) DBUG_ASSERT(!included || rli->linfo.index_file_start_offset == 0); err: - my_free(to_purge_if_included, MYF(0)); - pthread_mutex_unlock(&LOCK_index); + my_free(to_purge_if_included); + mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } @@ -3395,7 +3781,7 @@ int MYSQL_BIN_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads LOG_INFO_EOF to_log not found LOG_INFO_EMFILE too many files opened LOG_INFO_FATAL if any other than ENOENT error from - my_stat() or my_delete() + mysql_file_stat() or mysql_file_delete() */ int MYSQL_BIN_LOG::purge_logs(const char *to_log, @@ -3412,7 +3798,7 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log, DBUG_PRINT("info",("to_log= %s",to_log)); if (need_mutex) - pthread_mutex_lock(&LOCK_index); + mysql_mutex_lock(&LOCK_index); if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/))) { sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not " @@ -3475,7 +3861,7 @@ err: DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICIDE();); if (need_mutex) - pthread_mutex_unlock(&LOCK_index); + mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } @@ -3539,8 +3925,7 @@ int MYSQL_BIN_LOG::close_purge_index_file() bool MYSQL_BIN_LOG::is_inited_purge_index_file() { - DBUG_ENTER("MYSQL_BIN_LOG::is_inited_purge_index_file"); - DBUG_RETURN (my_b_inited(&purge_index_file)); + return my_b_inited(&purge_index_file); } int MYSQL_BIN_LOG::sync_purge_index_file() @@ -3576,13 +3961,12 @@ int MYSQL_BIN_LOG::register_create_index_entry(const char *entry) int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space, bool need_mutex) { + DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry"); MY_STAT s; int error= 0; LOG_INFO log_info; LOG_INFO check_log_info; - DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry"); - DBUG_ASSERT(my_b_inited(&purge_index_file)); if ((error=reinit_io_cache(&purge_index_file, READ_CACHE, 0, 0, 0))) @@ -3614,7 +3998,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space, /* Get rid of the trailing '\n' */ log_info.log_file_name[length-1]= 0; - if (!my_stat(log_info.log_file_name, &s, MYF(0))) + if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &s, MYF(0))) { if (my_errno == ENOENT) { @@ -3628,7 +4012,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space, ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), log_info.log_file_name); } - sql_print_information("Failed to execute my_stat on file '%s'", + sql_print_information("Failed to execute mysql_file_stat on file '%s'", log_info.log_file_name); my_errno= 0; } @@ -3766,7 +4150,7 @@ err: @retval LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated LOG_INFO_FATAL if any other than ENOENT error from - my_stat() or my_delete() + mysql_file_stat() or mysql_file_delete() */ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) @@ -3779,7 +4163,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) DBUG_ENTER("purge_logs_before_date"); - pthread_mutex_lock(&LOCK_index); + mysql_mutex_lock(&LOCK_index); to_log[0]= 0; if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/))) @@ -3789,7 +4173,8 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) !is_active(log_info.log_file_name) && !log_in_use(log_info.log_file_name)) { - if (!my_stat(log_info.log_file_name, &stat_area, MYF(0))) + if (!mysql_file_stat(m_key_file_log, + log_info.log_file_name, &stat_area, MYF(0))) { if (my_errno == ENOENT) { @@ -3838,7 +4223,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) error= (to_log[0] ? purge_logs(to_log, 1, 0, 1, (ulonglong *) 0) : 0); err: - pthread_mutex_unlock(&LOCK_index); + mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } #endif /* HAVE_REPLICATION */ @@ -3924,11 +4309,11 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) } if (need_lock) - pthread_mutex_lock(&LOCK_log); - pthread_mutex_lock(&LOCK_index); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_index); - safe_mutex_assert_owner(&LOCK_log); - safe_mutex_assert_owner(&LOCK_index); + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_index); /* if binlog is used as tc log, be sure all xids are "unlogged", @@ -3942,12 +4327,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) if (prepared_xids) { tc_log_page_waits++; - pthread_mutex_lock(&LOCK_prep_xids); + mysql_mutex_lock(&LOCK_prep_xids); while (prepared_xids) { DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); - pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids); + mysql_cond_wait(&COND_prep_xids, &LOCK_prep_xids); } - pthread_mutex_unlock(&LOCK_prep_xids); + mysql_mutex_unlock(&LOCK_prep_xids); } /* Reuse old name if not binlog and not update log */ @@ -3978,7 +4363,6 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) */ if (is_relay_log) r.checksum_alg= relay_log_checksum_alg; - r.pre_55_writing_direct(); DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) || (error= r.write(&log_file))) @@ -4038,7 +4422,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) close_on_error= TRUE; } - my_free(old_name,MYF(0)); + my_free(old_name); end: @@ -4066,8 +4450,8 @@ end: } if (need_lock) - pthread_mutex_unlock(&LOCK_log); - pthread_mutex_unlock(&LOCK_index); + mysql_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } @@ -4076,7 +4460,7 @@ end: bool MYSQL_BIN_LOG::append(Log_event* ev) { bool error = 0; - pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); DBUG_ENTER("MYSQL_BIN_LOG::append"); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); @@ -4084,7 +4468,6 @@ bool MYSQL_BIN_LOG::append(Log_event* ev) Log_event::write() is smart enough to use my_b_write() or my_b_append() depending on the kind of cache we have. */ - ev->pre_55_writing_direct(); if (ev->write(&log_file)) { error=1; @@ -4092,10 +4475,12 @@ bool MYSQL_BIN_LOG::append(Log_event* ev) } bytes_written+= ev->data_written; DBUG_PRINT("info",("max_size: %lu",max_size)); + if (flush_and_sync(0)) + goto err; if ((uint) my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); signal_update(); // Safe as we don't call close DBUG_RETURN(error); } @@ -4110,7 +4495,7 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...) DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); - safe_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_log); do { if (my_b_append(&log_file,(uchar*) buf,len)) @@ -4121,6 +4506,8 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...) bytes_written += len; } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); DBUG_PRINT("info",("max_size: %lu",max_size)); + if (flush_and_sync(0)) + goto err; if ((uint) my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: @@ -4129,17 +4516,21 @@ err: DBUG_RETURN(error); } - -bool MYSQL_BIN_LOG::flush_and_sync() +bool MYSQL_BIN_LOG::flush_and_sync(bool *synced) { int err=0, fd=log_file.file; - safe_mutex_assert_owner(&LOCK_log); + if (synced) + *synced= 0; + mysql_mutex_assert_owner(&LOCK_log); if (flush_io_cache(&log_file)) return 1; - if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period) + uint sync_period= get_sync_period(); + if (sync_period && ++sync_counter >= sync_period) { - sync_binlog_counter= 0; - err=my_sync(fd, MYF(MY_WME)); + sync_counter= 0; + err= mysql_file_sync(fd, MYF(MY_WME)); + if (synced) + *synced= 1; #ifndef DBUG_OFF if (opt_binlog_dbug_fsync_sleep > 0) my_sleep(opt_binlog_dbug_fsync_sleep); @@ -4169,6 +4560,72 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) query_id_param >= thd->binlog_evt_union.first_query_id); } +/** + This function checks if a transactional table was updated by the + current transaction. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @c false otherwise. +*/ +bool +trans_has_updated_trans_table(const THD* thd) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + return (cache_mngr ? !cache_mngr->trx_cache.empty() : 0); +} + +/** + This function checks if a transactional table was updated by the + current statement. + + @param thd The client thread that executed the current statement. + @return + @c true if a transactional table was updated, @c false otherwise. +*/ +bool +stmt_has_updated_trans_table(const THD *thd) +{ + Ha_trx_info *ha_info; + + for (ha_info= thd->transaction.stmt.ha_list; ha_info; + ha_info= ha_info->next()) + { + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + return (TRUE); + } + return (FALSE); +} + +/** + This function checks if either a trx-cache or a non-trx-cache should + be used. If @c bin_log_direct_non_trans_update is active or the format + is either MIXED or ROW, the cache to be used depends on the flag @c + is_transactional. + + On the other hand, if binlog_format is STMT or direct option is + OFF, the trx-cache should be used if and only if the statement is + transactional or the trx-cache is not empty. Otherwise, the + non-trx-cache should be used. + + @param thd The client thread. + @param is_transactional The changes are related to a trx-table. + @return + @c true if a trx-cache should be used, @c false otherwise. +*/ +bool use_trans_cache(const THD* thd, bool is_transactional) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + return + ((thd->is_current_stmt_binlog_format_row() || + thd->variables.binlog_direct_non_trans_update) ? is_transactional : + (is_transactional || !cache_mngr->trx_cache.empty())); +} + /** This function checks if a transaction, either a multi-statement or a single statement transaction is about to commit or not. @@ -4179,43 +4636,40 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param) @return @c true if committing a transaction, otherwise @c false. */ -bool ending_trans(const THD* thd, const bool all) +bool ending_trans(THD* thd, const bool all) { - return (all || (!all && !(thd->options & - (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))); + return (all || ending_single_stmt_trans(thd, all)); } /** - This function checks if a non-transactional table was updated by - the current transaction. + This function checks if a single statement transaction is about + to commit or not. @param thd The client thread that executed the current statement. + @param all Committing a transaction (i.e. TRUE) or a statement + (i.e. FALSE). @return - @c true if a non-transactional table was updated, @c false - otherwise. + @c true if committing a single statement transaction, otherwise + @c false. */ -bool trans_has_updated_non_trans_table(const THD* thd) +bool ending_single_stmt_trans(THD* thd, const bool all) { - return (thd->transaction.all.modified_non_trans_table || - thd->transaction.stmt.modified_non_trans_table); + return (!all && !thd->in_multi_stmt_transaction_mode()); } /** - This function checks if any statement was committed and cached. + This function checks if a non-transactional table was updated by + the current transaction. @param thd The client thread that executed the current statement. - @param all Committing a transaction (i.e. TRUE) or a statement - (i.e. FALSE). @return - @c true if at a statement was committed and cached, @c false + @c true if a non-transactional table was updated, @c false otherwise. */ -bool trans_has_no_stmt_committed(const THD* thd, bool all) +bool trans_has_updated_non_trans_table(const THD* thd) { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - - return (!all && !trx_data->at_least_one_stmt_committed); + return (thd->transaction.all.modified_non_trans_table || + thd->transaction.stmt.modified_non_trans_table); } /** @@ -4239,24 +4693,31 @@ bool stmt_has_updated_non_trans_table(const THD* thd) int THD::binlog_setup_trx_data() { DBUG_ENTER("THD::binlog_setup_trx_data"); - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - if (trx_data) + if (cache_mngr) DBUG_RETURN(0); // Already set up - trx_data= (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL)); - if (!trx_data || - open_cached_file(&trx_data->trans_log, mysql_tmpdir, + cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); + if (!cache_mngr || + open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir, + LOG_PREFIX, binlog_stmt_cache_size, MYF(MY_WME)) || + open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir, LOG_PREFIX, binlog_cache_size, MYF(MY_WME))) { - my_free((uchar*)trx_data, MYF(MY_ALLOW_ZERO_PTR)); + my_free(cache_mngr); DBUG_RETURN(1); // Didn't manage to set it up } - thd_set_ha_data(this, binlog_hton, trx_data); - - trx_data= new (thd_get_ha_data(this, binlog_hton)) binlog_trx_data; + thd_set_ha_data(this, binlog_hton, cache_mngr); + cache_mngr= new (thd_get_ha_data(this, binlog_hton)) + binlog_cache_mngr(max_binlog_stmt_cache_size, + max_binlog_cache_size, + &binlog_stmt_cache_use, + &binlog_stmt_cache_disk_use, + &binlog_cache_use, + &binlog_cache_disk_use); DBUG_RETURN(0); } @@ -4273,11 +4734,10 @@ int THD::binlog_setup_trx_data() - Start a transaction if not in autocommit mode or if a BEGIN statement has been seen. - - Start a statement transaction to allow us to truncate the binary - log. + - Start a statement transaction to allow us to truncate the cache. - Save the currrent binlog position so that we can roll back the - statement by truncating the transaction log. + statement by truncating the cache. We only update the saved position if the old one was undefined, the reason is that there are some cases (e.g., for CREATE-SELECT) @@ -4291,18 +4751,18 @@ int THD::binlog_setup_trx_data() void THD::binlog_start_trans_and_stmt() { - binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); DBUG_ENTER("binlog_start_trans_and_stmt"); - DBUG_PRINT("enter", ("trx_data: 0x%lx trx_data->before_stmt_pos: %lu", - (long) trx_data, - (trx_data ? (ulong) trx_data->before_stmt_pos : + DBUG_PRINT("enter", ("cache_mngr: %p cache_mngr->trx_cache.get_prev_position(): %lu", + cache_mngr, + (cache_mngr ? (ulong) cache_mngr->trx_cache.get_prev_position() : (ulong) 0))); - if (trx_data == NULL || - trx_data->before_stmt_pos == MY_OFF_T_UNDEF) + if (cache_mngr == NULL || + cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF) { this->binlog_set_stmt_begin(); - if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + if (in_multi_stmt_transaction_mode()) trans_register_ha(this, TRUE, binlog_hton); trans_register_ha(this, FALSE, binlog_hton); /* @@ -4320,47 +4780,57 @@ THD::binlog_start_trans_and_stmt() } void THD::binlog_set_stmt_begin() { - binlog_trx_data *trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); /* - The call to binlog_trans_log_savepos() might create the trx_data + The call to binlog_trans_log_savepos() might create the cache_mngr structure, if it didn't exist before, so we save the position into an auto variable and then write it into the transaction - data for the binary log (i.e., trx_data). + data for the binary log (i.e., cache_mngr). */ my_off_t pos= 0; binlog_trans_log_savepos(this, &pos); - trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); - trx_data->before_stmt_pos= pos; + cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + cache_mngr->trx_cache.set_prev_position(pos); } static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd) { int err= 0; - binlog_trx_data *trx_data; DBUG_ENTER("binlog_start_consistent_snapshot"); thd->binlog_setup_trx_data(); - trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */ - strmake(trx_data->last_commit_pos_file, mysql_bin_log.last_commit_pos_file, - sizeof(trx_data->last_commit_pos_file)-1); - trx_data->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; + strmake(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file, + sizeof(cache_mngr->last_commit_pos_file)-1); + cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; trans_register_ha(thd, TRUE, hton); DBUG_RETURN(err); } -/* - Write a table map to the binary log. If with_annotate != NULL and - *with_annotate = TRUE write also Annotate_rows before the table map. - */ +/** + This function writes a table map to the binary log. + Note that in order to keep the signature uniform with related methods, + we use a redundant parameter to indicate whether a transactional table + was changed or not. -int THD::binlog_write_table_map(TABLE *table, bool is_trans, + If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the table map. + + @param table a pointer to the table. + @param is_transactional @c true indicates a transactional table, + otherwise @c false a non-transactional. + @return + nonzero if an error pops up when writing the table map event. +*/ +int THD::binlog_write_table_map(TABLE *table, bool is_transactional, my_bool *with_annotate) { int error; @@ -4370,145 +4840,177 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans, table->s->table_map_id)); /* Pre-conditions */ - DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); Table_map_log_event - the_event(this, table, table->s->table_map_id, is_trans); + the_event(this, table, table->s->table_map_id, is_transactional); - if (is_trans && binlog_table_maps == 0) + if (binlog_table_maps == 0) binlog_start_trans_and_stmt(); - if ((error= mysql_bin_log.write(&the_event, with_annotate))) + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + IO_CACHE *file= + cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional)); + if (with_annotate && *with_annotate) + { + Annotate_rows_log_event anno(current_thd, is_transactional, false); + /* Annotate event should be written not more than once */ + *with_annotate= 0; + if ((error= anno.write(file))) + DBUG_RETURN(error); + } + if ((error= the_event.write(file))) DBUG_RETURN(error); binlog_table_maps++; DBUG_RETURN(0); } +/** + This function retrieves a pending row event from a cache which is + specified through the parameter @c is_transactional. Respectively, when it + is @c true, the pending event is returned from the transactional cache. + Otherwise from the non-transactional cache. + + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. + @return + The row event if any. +*/ Rows_log_event* -THD::binlog_get_pending_rows_event() const +THD::binlog_get_pending_rows_event(bool is_transactional) const { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + Rows_log_event* rows= NULL; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + /* - This is less than ideal, but here's the story: If there is no - trx_data, prepare_pending_rows_event() has never been called - (since the trx_data is set up there). In that case, we just return - NULL. + This is less than ideal, but here's the story: If there is no cache_mngr, + prepare_pending_rows_event() has never been called (since the cache_mngr + is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending() : NULL; + if (cache_mngr) + { + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); + + rows= cache_data->pending(); + } + return (rows); } +/** + This function stores a pending row event into a cache which is specified + through the parameter @c is_transactional. Respectively, when it is @c + true, the pending event is stored into the transactional cache. Otherwise + into the non-transactional cache. + + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ void -THD::binlog_set_pending_rows_event(Rows_log_event* ev) +THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional) { if (thd_get_ha_data(this, binlog_hton) == NULL) binlog_setup_trx_data(); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(this, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); + + DBUG_ASSERT(cache_mngr); + + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); - DBUG_ASSERT(trx_data); - trx_data->set_pending(ev); + cache_data->set_pending(ev); } /** - Remove the pending rows event, discarding any outstanding rows. - - If there is no pending rows event available, this is effectively a + This function removes the pending rows event, discarding any outstanding + rows. If there is no pending rows event available, this is effectively a no-op. - */ + + @param thd a pointer to the user thread. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. +*/ int -MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd) +MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { delete pending; - trx_data->set_pending(NULL); + cache_data->set_pending(NULL); } DBUG_RETURN(0); } /* - Moves the last bunch of rows from the pending Rows event to the binlog - (either cached binlog if transaction, or disk binlog). Sets a new pending - event. + Moves the last bunch of rows from the pending Rows event to a cache (either + transactional cache if is_transaction is @c true, or the non-transactional + cache otherwise. Sets a new pending event. + + @param thd a pointer to the user thread. + @param evt a pointer to the row event. + @param is_transactional @c true indicates a transactional cache, + otherwise @c false a non-transactional. */ int MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, - Rows_log_event* event) + Rows_log_event* event, + bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(mysql_bin_log.is_open()); DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + DBUG_ASSERT(cache_mngr); - DBUG_ASSERT(trx_data); + binlog_cache_data *cache_data= + cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional)); - DBUG_PRINT("info", ("trx_data->pending(): 0x%lx", (long) trx_data->pending())); + DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending())); - if (Rows_log_event* pending= trx_data->pending()) + if (Rows_log_event* pending= cache_data->pending()) { + IO_CACHE *file= &cache_data->cache_log; + /* - Decide if we should write to the log file directly or to the - transaction log. + Write pending event to the cache. */ - if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - { - /* Write to transaction log/cache. */ - if (pending->write(&trx_data->trans_log)) - { - set_write_error(thd); - DBUG_RETURN(1); - } - } - else + if (pending->write(file)) { - /* Write directly to log file. */ - pthread_mutex_lock(&LOCK_log); - pending->pre_55_writing_direct(); - if (pending->write(&log_file)) - { - pthread_mutex_unlock(&LOCK_log); - set_write_error(thd); - DBUG_RETURN(1); - } - - error= flush_and_sync(); - if (!error) - { - signal_update(); - error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); - } - - /* - Take mutex to protect against a reader seeing partial writes of 64-bit - offset on 32-bit CPUs. - */ - pthread_mutex_lock(&LOCK_commit_ordered); - last_commit_pos_offset= my_b_tell(&log_file); - pthread_mutex_unlock(&LOCK_commit_ordered); - pthread_mutex_unlock(&LOCK_log); + set_write_error(thd, is_transactional); + if (check_write_error(thd) && cache_data && + stmt_has_updated_non_trans_table(thd)) + cache_data->set_incident(); + DBUG_RETURN(1); } delete pending; } - thd->binlog_set_pending_rows_event(event); + thd->binlog_set_pending_rows_event(event, is_transactional); DBUG_RETURN(error); } @@ -4523,8 +5025,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { THD *thd= event_info->thd; bool error= 1; - uint16 cache_type; DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); + binlog_cache_data *cache_data= 0; + bool is_trans_cache= FALSE; + bool using_trans= event_info->use_trans_cache(); + bool direct= event_info->use_direct_logging(); if (thd->binlog_evt_union.do_union) { @@ -4533,7 +5038,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) We will log the function call to the binary log on function exit */ thd->binlog_evt_union.unioned_events= TRUE; - thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt; + thd->binlog_evt_union.unioned_events_trans |= using_trans; DBUG_RETURN(0); } @@ -4543,8 +5048,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) this will close all tables on the slave. */ bool const end_stmt= - thd->prelocked_mode && thd->lex->requires_prelocking(); - if (thd->binlog_flush_pending_rows_event(end_stmt)) + thd->locked_tables_mode && thd->lex->requires_prelocking(); + if (thd->binlog_flush_pending_rows_event(end_stmt, using_trans)) DBUG_RETURN(error); /* @@ -4554,8 +5059,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) */ if (likely(is_open())) { - IO_CACHE *file= &log_file; - my_off_t my_org_b_tell; + my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION /* In the future we need to add to the following if tests like @@ -4563,110 +5067,71 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) binlog_[wild_]{do|ignore}_table?" (WL#1049)" */ const char *local_db= event_info->get_db(); - if ((!(thd->options & OPTION_BIN_LOG)) || + if ((!(thd->variables.option_bits & OPTION_BIN_LOG)) || (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT && thd->lex->sql_command != SQLCOM_SAVEPOINT && !binlog_filter->db_ok(local_db))) - { DBUG_RETURN(0); - } #endif /* HAVE_REPLICATION */ - my_org_b_tell= my_b_tell(file); + IO_CACHE *file= NULL; -#if defined(USING_TRANSACTIONS) - /* - Should we write to the binlog cache or to the binlog on disk? - - Write to the binlog cache if: - 1 - a transactional engine/table is updated (stmt_has_updated_trans_table == TRUE); - 2 - or the event asks for it (cache_stmt == TRUE); - 3 - or the cache is already not empty (meaning we're in a transaction; - note that the present event could be about a non-transactional table, but - still we need to write to the binlog cache in that case to handle updates - to mixed trans/non-trans table types). - - Write to the binlog on disk if only a non-transactional engine is - updated and: - 1 - the binlog cache is empty or; - 2 - --binlog-direct-non-transactional-updates is set and we are about to - use the statement format. When using the row format (cache_stmt == TRUE). - */ - if (opt_using_transactions) + if (direct) + { + file= &log_file; + my_org_b_tell= my_b_tell(file); + mysql_mutex_lock(&LOCK_log); + } + else { if (thd->binlog_setup_trx_data()) goto err; - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - IO_CACHE *trans_log= &trx_data->trans_log; - my_off_t trans_log_pos= my_b_tell(trans_log); - if (event_info->get_cache_stmt() || stmt_has_updated_trans_table(thd) || - (!thd->variables.binlog_direct_non_trans_update && - trans_log_pos != 0)) - { - DBUG_PRINT("info", ("Using trans_log: cache: %d, trans_log_pos: %lu", - event_info->get_cache_stmt(), - (ulong) trans_log_pos)); - thd->binlog_start_trans_and_stmt(); - file= trans_log; - } + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + is_trans_cache= use_trans_cache(thd, using_trans); + file= cache_mngr->get_binlog_cache_log(is_trans_cache); + cache_data= cache_mngr->get_binlog_cache_data(is_trans_cache); + + if (thd->lex->stmt_accessed_non_trans_temp_table()) + cache_data->set_changes_to_non_trans_temp_table(); + + thd->binlog_start_trans_and_stmt(); } -#endif /* USING_TRANSACTIONS */ DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); - if (file == &log_file) - { - pthread_mutex_lock(&LOCK_log); - /* - We did not want to take LOCK_log unless really necessary. - However, now that we hold LOCK_log, we must check is_open() again, lest - the log was closed just before. - */ - if (unlikely(!is_open())) - { - pthread_mutex_unlock(&LOCK_log); - DBUG_RETURN(error); - } - event_info->pre_55_writing_direct(); - } - - cache_type= event_info->cache_type; /* - No check for auto events flag here - this write method should - never be called if auto-events are enabled - */ + No check for auto events flag here - this write method should + never be called if auto-events are enabled. - /* - 1. Write first log events which describe the 'run environment' - of the SQL command + Write first log events which describe the 'run environment' + of the SQL command. If row-based binlogging, Insert_id, Rand + and other kind of "setting context" events are not needed. */ if (with_annotate && *with_annotate) { DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT); - Annotate_rows_log_event anno(thd, cache_type); + Annotate_rows_log_event anno(thd, using_trans, direct); /* Annotate event should be written not more than once */ *with_annotate= 0; if (anno.write(file)) goto err; } - /* - If row-based binlogging, Insert_id, Rand and other kind of "setting - context" events are not needed. - */ + if (thd) { - if (!thd->current_stmt_binlog_row_based) + if (!thd->is_current_stmt_binlog_format_row()) { if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt) { Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog, - cache_type); + using_trans, direct); if (e.write(file)) - goto err_unlock; + goto err; } if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) { @@ -4675,16 +5140,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) nb_elements())); Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT, thd->auto_inc_intervals_in_cur_stmt_for_binlog. - minimum(), cache_type); + minimum(), using_trans, direct); if (e.write(file)) - goto err_unlock; + goto err; } if (thd->rand_used) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2, - cache_type); + using_trans, direct); if (e.write(file)) - goto err_unlock; + goto err; } if (thd->user_var_events.elements) { @@ -4692,56 +5157,85 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { BINLOG_USER_VAR_EVENT *user_var_event; get_dynamic(&thd->user_var_events,(uchar*) &user_var_event, i); + + /* setting flags for user var log event */ + uchar flags= User_var_log_event::UNDEF_F; + if (user_var_event->unsigned_flag) + flags|= User_var_log_event::UNSIGNED_F; + User_var_log_event e(thd, user_var_event->user_var_event->name.str, user_var_event->user_var_event->name.length, user_var_event->value, user_var_event->length, user_var_event->type, user_var_event->charset_number, - cache_type); + flags, + using_trans, + direct); if (e.write(file)) - goto err_unlock; + goto err; } } } } - /* Write the SQL command */ - if (event_info->write(file) || + /* + Write the event. + */ + if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) - goto err_unlock; + goto err; - if (file == &log_file) // we are writing to the real log (disk) + error= 0; +err: + if (direct) { - ulonglong data_written= (my_b_tell(file) - my_org_b_tell); - status_var_add(thd->status_var.binlog_bytes_written, data_written); + my_off_t offset= my_b_tell(file); + bool check_purge= false; - if (flush_and_sync()) - goto err_unlock; - signal_update(); - if ((error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))) - goto err_unlock; - - } - error=0; + if (!error) + { + bool synced; + + if ((error= flush_and_sync(&synced))) + { + } + else if ((error= RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced)))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + } + else + { + signal_update(); + if ((error= rotate(false, &check_purge))) + check_purge= false; + } + } + + status_var_add(thd->status_var.binlog_bytes_written, + offset - my_org_b_tell); -err_unlock: - if (file == &log_file) - { - my_off_t offset= my_b_tell(&log_file); /* Take mutex to protect against a reader seeing partial writes of 64-bit offset on 32-bit CPUs. */ - pthread_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_commit_ordered); last_commit_pos_offset= offset; - pthread_mutex_unlock(&LOCK_commit_ordered); - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_log); + + if (check_purge) + purge(); } -err: if (error) - set_write_error(thd); + { + set_write_error(thd, is_trans_cache); + if (check_write_error(thd) && cache_data && + stmt_has_updated_non_trans_table(thd)) + cache_data->set_incident(); + } } DBUG_RETURN(error); @@ -4773,7 +5267,7 @@ bool LOGGER::log_command(THD *thd, enum enum_server_command command) */ if (*general_log_handler_list && (what_to_log & (1L << (uint) command))) { - if ((thd->options & OPTION_LOG_OFF) + if ((thd->variables.option_bits & OPTION_LOG_OFF) #ifndef NO_EMBEDDED_ACCESS_CHECKS && (sctx->master_access & SUPER_ACL) #endif @@ -4818,25 +5312,29 @@ bool general_log_write(THD *thd, enum enum_server_command command, } /** + The method executes rotation when LOCK_log is already acquired + by the caller. + + @param force_rotate caller can request the log rotation + @param check_purge is set to true if rotation took place + @note If rotation fails, for instance the server was unable to create a new log file, we still try to write an incident event to the current log. @retval - nonzero - error + nonzero - error in rotating routine. */ -int MYSQL_BIN_LOG::rotate_and_purge(uint flags) +int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) { int error= 0; - DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge"); -#ifdef HAVE_REPLICATION - bool check_purge= false; -#endif - if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED)) - pthread_mutex_lock(&LOCK_log); - if ((flags & RP_FORCE_ROTATE) || - (my_b_tell(&log_file) >= (my_off_t) max_size)) + DBUG_ENTER("MYSQL_BIN_LOG::rotate"); + + //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log); + *check_purge= false; + + if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size)) { if ((error= new_file_without_locking())) /** @@ -4849,37 +5347,73 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags) to the current log. */ if (!write_incident_already_locked(current_thd)) - flush_and_sync(); + flush_and_sync(0); -#ifdef HAVE_REPLICATION - check_purge= true; -#endif - if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE) - checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; // done + *check_purge= true; } - if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED)) - pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); +} + +/** + The method executes logs purging routine. + + @retval + nonzero - error in rotating routine. +*/ +void MYSQL_BIN_LOG::purge() +{ + mysql_mutex_assert_not_owner(&LOCK_log); #ifdef HAVE_REPLICATION - /* - NOTE: Run purge_logs wo/ holding LOCK_log - as it otherwise will deadlock in ndbcluster_binlog_index_purge_file - */ - if (!error && check_purge && expire_logs_days) + if (expire_logs_days) { + DEBUG_SYNC(current_thd, "at_purge_logs_before_date"); time_t purge_time= my_time(0) - expire_logs_days*24*60*60; if (purge_time >= 0) + { purge_logs_before_date(purge_time); + } + DEBUG_SYNC(current_thd, "after_purge_logs_before_date"); } #endif +} + +/** + The method is a shortcut of @c rotate() and @c purge(). + LOCK_log is acquired prior to rotate and is released after it. + + @param force_rotate caller can request the log rotation + + @retval + nonzero - error in rotating routine. +*/ +int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate) +{ + int error= 0; + DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge"); + bool check_purge= false; + + //todo: fix the macro def and restore safe_mutex_assert_not_owner(&LOCK_log); + mysql_mutex_lock(&LOCK_log); + if ((error= rotate(force_rotate, &check_purge))) + check_purge= false; + /* + NOTE: Run purge_logs wo/ holding LOCK_log because it does not need + the mutex. Otherwise causes various deadlocks. + */ + mysql_mutex_unlock(&LOCK_log); + + if (check_purge) + purge(); + DBUG_RETURN(error); } uint MYSQL_BIN_LOG::next_file_id() { uint res; - pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); res = file_id++; - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); return res; } @@ -4930,7 +5464,7 @@ uint MYSQL_BIN_LOG::next_file_id() int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { - safe_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) return ER_ERROR_ON_WRITE; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; @@ -4967,7 +5501,6 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) do { - /* if we only got a partial header in the last iteration, get the other half now and process a full header. @@ -5157,9 +5690,9 @@ int query_error_code(THD *thd, bool not_killed) if (not_killed || (killed_mask_hard(thd->killed) == KILL_BAD_DATA)) { - error= thd->is_error() ? thd->main_da.sql_errno() : 0; + error= thd->is_error() ? thd->stmt_da->sql_errno() : 0; - /* thd->main_da.sql_errno() might be ER_SERVER_SHUTDOWN or + /* thd->stmt_da->sql_errno() might be ER_SERVER_SHUTDOWN or ER_QUERY_INTERRUPTED, So here we need to make sure that error is not set to these errors when specified not_killed by the caller. @@ -5188,7 +5721,6 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd) if (likely(is_open())) { - ev.pre_55_writing_direct(); error= ev.write(&log_file); status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); } @@ -5201,27 +5733,33 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) { uint error= 0; my_off_t offset; + bool check_purge= false; DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); - pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); if (likely(is_open())) { if (!(error= write_incident_already_locked(thd)) && - !(error= flush_and_sync())) + !(error= flush_and_sync(0))) { signal_update(); - error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); + if ((error= rotate(false, &check_purge))) + check_purge= false; } + offset= my_b_tell(&log_file); /* Take mutex to protect against a reader seeing partial writes of 64-bit offset on 32-bit CPUs. */ - pthread_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_commit_ordered); last_commit_pos_offset= offset; - pthread_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_log); + + if (check_purge) + purge(); } - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -5251,16 +5789,21 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) */ bool -MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, + binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, + bool using_stmt_cache, + bool using_trx_cache) { group_commit_entry entry; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); entry.thd= thd; - entry.trx_data= trx_data; + entry.cache_mngr= cache_mngr; entry.error= 0; entry.all= all; + entry.using_stmt_cache= using_stmt_cache; + entry.using_trx_cache= using_trx_cache; /* Log "BEGIN" at the beginning of every transaction. Here, a transaction is @@ -5272,10 +5815,12 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, Due to group commit the actual writing to binlog may happen in a different thread. */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE, + TRUE, 0); entry.begin_event= &qinfo; entry.end_event= end_ev; - if (trx_data->has_incident()) + if (cache_mngr->stmt_cache.has_incident() || + cache_mngr->trx_cache.has_incident()) { Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); entry.incident_event= &inc_ev; @@ -5300,18 +5845,18 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) */ entry->thd->clear_wakeup_ready(); - pthread_mutex_lock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_prepare_ordered); group_commit_entry *orig_queue= group_commit_queue; entry->next= orig_queue; group_commit_queue= entry; - if (entry->trx_data->using_xa) + if (entry->cache_mngr->using_xa) { DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); run_prepare_ordered(entry->thd, entry->all); DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); } - pthread_mutex_unlock(&LOCK_prepare_ordered); + mysql_mutex_unlock(&LOCK_prepare_ordered); DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); /* @@ -5327,21 +5872,21 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { /* For the leader, trx_group_commit_leader() already took the lock. */ if (orig_queue != NULL) - pthread_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_commit_ordered); DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered"); ++num_commits; - if (entry->trx_data->using_xa && !entry->error) + if (entry->cache_mngr->using_xa && !entry->error) run_commit_ordered(entry->thd, entry->all); group_commit_entry *next= entry->next; if (!next) { group_commit_queue_busy= FALSE; - pthread_cond_signal(&COND_queue_busy); + mysql_cond_signal(&COND_queue_busy); DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered"); } - pthread_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); if (next) { @@ -5359,7 +5904,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) break; case ER_ERROR_ON_READ: my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), - entry->trx_data->trans_log.file_name, entry->commit_errno); + entry->error_cache->file_name, entry->commit_errno); break; default: /* @@ -5377,7 +5922,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) we need to mark it as not needed for recovery (unlog() is not called for a transaction if log_xid() fails). */ - if (entry->trx_data->using_xa && entry->trx_data->xa_xid) + if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid) mark_xid_done(); return 1; @@ -5386,7 +5931,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) /* Do binlog group commit as the lead thread. - This must be called when this thread/transaction is queued at the start of + This must be called when this statement/transaction is queued at the start of the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group commit all the transactions in the queue (more may have entered while waiting for LOCK_log). After commit is done, all other threads in the queue will be @@ -5397,42 +5942,42 @@ void MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { uint xid_count= 0; - uint write_count= 0; - my_off_t commit_offset; + my_off_t UNINIT_VAR(commit_offset); group_commit_entry *current; group_commit_entry *last_in_queue; - DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); - LINT_INIT(commit_offset); - - /* - Lock the LOCK_log(), and once we get it, collect any additional writes - that queued up while we were waiting. - */ - VOID(pthread_mutex_lock(&LOCK_log)); - DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); - - pthread_mutex_lock(&LOCK_prepare_ordered); - current= group_commit_queue; - group_commit_queue= NULL; - pthread_mutex_unlock(&LOCK_prepare_ordered); - - /* As the queue is in reverse order of entering, reverse it. */ group_commit_entry *queue= NULL; - last_in_queue= current; - while (current) - { - group_commit_entry *next= current->next; - current->next= queue; - queue= current; - current= next; - } - DBUG_ASSERT(leader == queue /* the leader should be first in queue */); + bool check_purge= false; + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); - /* Now we have in queue the list of transactions to be committed in order. */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { /* + Lock the LOCK_log(), and once we get it, collect any additional writes + that queued up while we were waiting. + */ + mysql_mutex_lock(&LOCK_log); + DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); + + mysql_mutex_lock(&LOCK_prepare_ordered); + current= group_commit_queue; + group_commit_queue= NULL; + mysql_mutex_unlock(&LOCK_prepare_ordered); + + /* As the queue is in reverse order of entering, reverse it. */ + last_in_queue= current; + while (current) + { + group_commit_entry *next= current->next; + current->next= queue; + queue= current; + current= next; + } + DBUG_ASSERT(leader == queue /* the leader should be first in queue */); + + /* Now we have in queue the list of transactions to be committed in order. */ + + /* Commit every transaction in the queue. Note that we are doing this in a different thread than the one running @@ -5444,46 +5989,61 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ for (current= queue; current != NULL; current= current->next) { - binlog_trx_data *trx_data= current->trx_data; - IO_CACHE *cache= &trx_data->trans_log; + binlog_cache_mngr *cache_mngr= current->cache_mngr; /* - We only bother to write to the binary log if there is anything - to write. + We already checked before that at least one cache is non-empty; if both + are empty we would have skipped calling into here. */ - if (my_b_tell(cache) > 0) - { - if ((current->error= write_transaction(current))) - current->commit_errno= errno; + DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - write_count++; - } + current->error= write_transaction_or_stmt(current); - strmake(trx_data->last_commit_pos_file, log_file_name, - sizeof(trx_data->last_commit_pos_file)-1); + strmake(cache_mngr->last_commit_pos_file, log_file_name, + sizeof(cache_mngr->last_commit_pos_file)-1); commit_offset= my_b_write_tell(&log_file); - trx_data->last_commit_pos_offset= commit_offset; - if (trx_data->using_xa && trx_data->xa_xid) + cache_mngr->last_commit_pos_offset= commit_offset; + if (cache_mngr->using_xa && cache_mngr->xa_xid) xid_count++; } - if (write_count > 0) + bool synced= 0; + if (flush_and_sync(&synced)) { - if (flush_and_sync()) + for (current= queue; current != NULL; current= current->next) { - for (current= queue; current != NULL; current= current->next) + if (!current->error) { - if (!current->error) - { - current->error= ER_ERROR_ON_WRITE; - current->commit_errno= errno; - } + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + current->error_cache= NULL; } } - else + } + else + { + bool any_error= false; + bool all_error= true; + for (current= queue; current != NULL; current= current->next) { - signal_update(); + if (!current->error && + RUN_HOOK(binlog_storage, after_flush, + (current->thd, log_file_name, + current->cache_mngr->last_commit_pos_offset, synced))) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= -1; + current->error_cache= NULL; + any_error= true; + } + else + all_error= false; } + + if (any_error) + sql_print_error("Failed to run 'after_flush' hooks"); + if (!all_error) + signal_update(); } /* @@ -5500,7 +6060,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) } else { - if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED)) + if (rotate(false, &check_purge)) { /* If we fail to rotate, which thread should get the error? @@ -5509,12 +6069,13 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ last_in_queue->error= ER_ERROR_ON_WRITE; last_in_queue->commit_errno= errno; + check_purge= false; } } } DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); - pthread_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_commit_ordered); last_commit_pos_offset= commit_offset; /* We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; @@ -5522,7 +6083,11 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) messing up the order of commit_ordered() calls. But as soon as LOCK_commit_ordered is obtained, we can let the next group commit start. */ - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); + + if (check_purge) + purge(); + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); ++num_group_commits; @@ -5537,7 +6102,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ while (group_commit_queue_busy) - pthread_cond_wait(&COND_queue_busy, &LOCK_commit_ordered); + mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered); group_commit_queue_busy= TRUE; /* Note that we return with LOCK_commit_ordered locked! */ @@ -5555,7 +6120,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); ++num_commits; - if (current->trx_data->using_xa && !current->error) + if (current->cache_mngr->using_xa && !current->error) run_commit_ordered(current->thd, current->all); /* @@ -5568,63 +6133,91 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) current= next; } DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); - pthread_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); DBUG_VOID_RETURN; } + int -MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) +MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) { - binlog_trx_data *trx_data= entry->trx_data; - IO_CACHE *cache= &trx_data->trans_log; + binlog_cache_mngr *mngr= entry->cache_mngr; - entry->begin_event->pre_55_writing_direct(); if (entry->begin_event->write(&log_file)) return ER_ERROR_ON_WRITE; status_var_add(entry->thd->status_var.binlog_bytes_written, entry->begin_event->data_written); - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if ((write_cache(entry->thd, cache))) - DBUG_PRINT("info", ("error writing binlog cache")); - else - flush_and_sync(); + if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && + write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) + { + entry->error_cache= &mngr->stmt_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_WRITE; + } + + if (entry->using_trx_cache && !mngr->trx_cache.empty()) + { + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(entry->thd, + mngr->get_binlog_cache_log(TRUE)))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(0); - DBUG_PRINT("info", ("crashing before writing xid")); - DBUG_SUICIDE(); - }); + DBUG_PRINT("info", ("crashing before writing xid")); + DBUG_SUICIDE(); + }); - if (write_cache(entry->thd, cache)) - return ER_ERROR_ON_WRITE; + if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE))) + { + entry->error_cache= &mngr->trx_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_WRITE; + } + } - entry->end_event->pre_55_writing_direct(); if (entry->end_event->write(&log_file)) + { + entry->error_cache= NULL; + entry->commit_errno= errno; return ER_ERROR_ON_WRITE; + } status_var_add(entry->thd->status_var.binlog_bytes_written, entry->end_event->data_written); if (entry->incident_event) { - entry->incident_event->pre_55_writing_direct(); if (entry->incident_event->write(&log_file)) + { + entry->error_cache= NULL; + entry->commit_errno= errno; return ER_ERROR_ON_WRITE; + } } - if (cache->error) // Error on read + if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read + { + entry->error_cache= &mngr->stmt_cache.cache_log; + entry->commit_errno= errno; return ER_ERROR_ON_READ; + } + if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read + { + entry->error_cache= &mngr->trx_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_READ; + } return 0; } /** - Wait until we get a signal that the binary log has been updated. + Wait until we get a signal that the relay log has been updated. @param thd Thread variable - @param is_slave If 0, the caller is the Binlog_dump thread from master; - if 1, the caller is the SQL thread from the slave. This - influences only thd->proc_info. @note One must have a lock on LOCK_log before calling this function. @@ -5632,22 +6225,50 @@ MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) THD::enter_cond() (see NOTES in sql_class.h). */ -void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave) +void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) { const char *old_msg; - DBUG_ENTER("wait_for_update"); + DBUG_ENTER("wait_for_update_relay_log"); old_msg= thd->enter_cond(&update_cond, &LOCK_log, - is_slave ? - "Has read all relay log; waiting for the slave I/O " - "thread to update it" : - "Has sent all binlog to slave; waiting for binlog " - "to be updated"); - pthread_cond_wait(&update_cond, &LOCK_log); + "Slave has read all relay log; " + "waiting for the slave I/O " + "thread to update it" ); + mysql_cond_wait(&update_cond, &LOCK_log); thd->exit_cond(old_msg); DBUG_VOID_RETURN; } +/** + Wait until we get a signal that the binary log has been updated. + Applies to master only. + + NOTES + @param[in] thd a THD struct + @param[in] timeout a pointer to a timespec; + NULL means to wait w/o timeout. + @retval 0 if got signalled on update + @retval non-0 if wait timeout elapsed + @note + LOCK_log must be taken before calling this function. + LOCK_log is being released while the thread is waiting. + LOCK_log is released by the caller. +*/ + +int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, + const struct timespec *timeout) +{ + int ret= 0; + DBUG_ENTER("wait_for_update_bin_log"); + + if (!timeout) + mysql_cond_wait(&update_cond, &LOCK_log); + else + ret= mysql_cond_timedwait(&update_cond, &LOCK_log, + const_cast<struct timespec *>(timeout)); + DBUG_RETURN(ret); +} + /** Close the log file. @@ -5679,7 +6300,6 @@ void MYSQL_BIN_LOG::close(uint exiting) (uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options; DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - s.pre_55_writing_direct(); s.write(&log_file); bytes_written+= s.data_written; signal_update(); @@ -5690,16 +6310,16 @@ void MYSQL_BIN_LOG::close(uint exiting) if (log_file.type == WRITE_CACHE && log_type == LOG_BIN) { my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET; - my_off_t org_position= my_tell(log_file.file, MYF(0)); + my_off_t org_position= mysql_file_tell(log_file.file, MYF(0)); uchar flags= 0; // clearing LOG_EVENT_BINLOG_IN_USE_F - my_pwrite(log_file.file, &flags, 1, offset, MYF(0)); + mysql_file_pwrite(log_file.file, &flags, 1, offset, MYF(0)); /* Restore position so that anything we have in the IO_cache is written to the correct position. - We need the seek here, as my_pwrite() is not guaranteed to keep the + We need the seek here, as mysql_file_pwrite() is not guaranteed to keep the original position on system that doesn't support pwrite(). */ - my_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0)); + mysql_file_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0)); } /* this will cleanup IO_CACHE, sync and close the file */ @@ -5714,14 +6334,15 @@ void MYSQL_BIN_LOG::close(uint exiting) if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file)) { end_io_cache(&index_file); - if (my_close(index_file.file, MYF(0)) < 0 && ! write_error) + if (mysql_file_close(index_file.file, MYF(0)) < 0 && ! write_error) { write_error= 1; sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name, errno); } } log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED; - safeFree(name); + my_free(name); + name= NULL; DBUG_VOID_RETURN; } @@ -5736,10 +6357,10 @@ void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg) it's like if the SET command was never run. */ DBUG_ENTER("MYSQL_BIN_LOG::set_max_size"); - pthread_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_log); if (is_open()) max_size= max_size_arg; - pthread_mutex_unlock(&LOCK_log); + mysql_mutex_unlock(&LOCK_log); DBUG_VOID_RETURN; } @@ -5758,11 +6379,11 @@ void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg) @retval 1 String is a number @retval - 0 Error + 0 String is not a number */ static bool test_if_number(register const char *str, - long *res, bool allow_wildcards) + ulong *res, bool allow_wildcards) { reg2 int flag; const char *start; @@ -5858,10 +6479,10 @@ bool flush_error_log() bool result= 0; if (opt_error_log) { - VOID(pthread_mutex_lock(&LOCK_error_log)); + mysql_mutex_lock(&LOCK_error_log); if (redirect_std_streams(log_error_file)) result= 1; - VOID(pthread_mutex_unlock(&LOCK_error_log)); + mysql_mutex_unlock(&LOCK_error_log); } return result; } @@ -5869,11 +6490,12 @@ bool flush_error_log() void MYSQL_BIN_LOG::signal_update() { DBUG_ENTER("MYSQL_BIN_LOG::signal_update"); - pthread_cond_broadcast(&update_cond); + signal_cnt++; + mysql_cond_broadcast(&update_cond); DBUG_VOID_RETURN; } -#ifdef __NT__ +#ifdef _WIN32 static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, size_t length, size_t buffLen) { @@ -5906,7 +6528,7 @@ static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, DBUG_VOID_RETURN; } -#endif /* __NT__ */ +#endif /* _WIN32 */ #ifndef EMBEDDED_LIBRARY @@ -5919,7 +6541,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer, DBUG_ENTER("print_buffer_to_file"); DBUG_PRINT("enter",("buffer: %s", buffer)); - VOID(pthread_mutex_lock(&LOCK_error_log)); + mysql_mutex_lock(&LOCK_error_log); skr= my_time(0); localtime_r(&skr, &tm_tmp); @@ -5938,7 +6560,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer, fflush(stderr); - VOID(pthread_mutex_unlock(&LOCK_error_log)); + mysql_mutex_unlock(&LOCK_error_log); DBUG_VOID_RETURN; } @@ -5967,7 +6589,7 @@ int vprint_msg_to_log(enum loglevel level, const char *format, va_list args) length= my_vsnprintf(buff, sizeof(buff), format, args); print_buffer_to_file(level, buff, length); -#ifdef __NT__ +#ifdef _WIN32 print_buffer_to_nt_eventlog(level, buff, length, sizeof(buff)); #endif @@ -6016,35 +6638,12 @@ void sql_print_information(const char *format, ...) void -TC_init() -{ - my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, - "LOCK_prepare_ordered", MYF(0)); - my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, - "LOCK_commit_ordered", MYF(0)); - mutexes_inited= TRUE; -} - - -void -TC_destroy() -{ - if (mutexes_inited) - { - pthread_mutex_destroy(&LOCK_prepare_ordered); - pthread_mutex_destroy(&LOCK_commit_ordered); - mutexes_inited= FALSE; - } -} - - -void TC_LOG::run_prepare_ordered(THD *thd, bool all) { Ha_trx_info *ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; - safe_mutex_assert_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_owner(&LOCK_prepare_ordered); for (; ha_info; ha_info= ha_info->next()) { handlerton *ht= ha_info->ht(); @@ -6061,7 +6660,7 @@ TC_LOG::run_commit_ordered(THD *thd, bool all) Ha_trx_info *ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; - safe_mutex_assert_owner(&LOCK_commit_ordered); + mysql_mutex_assert_owner(&LOCK_commit_ordered); for (; ha_info; ha_info= ha_info->next()) { handlerton *ht= ha_info->ht(); @@ -6084,7 +6683,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, if (need_prepare_ordered) { - pthread_mutex_lock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_prepare_ordered); run_prepare_ordered(thd, all); if (need_commit_ordered) { @@ -6099,7 +6698,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, commit_ordered_queue= &entry; is_group_commit_leader= (previous_queue == NULL); } - pthread_mutex_unlock(&LOCK_prepare_ordered); + mysql_mutex_unlock(&LOCK_prepare_ordered); } cookie= 0; @@ -6121,9 +6720,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, if (is_group_commit_leader) { /* The first in queue starts the ball rolling. */ - pthread_mutex_lock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_prepare_ordered); while (commit_ordered_queue_busy) - pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); + mysql_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); commit_entry *queue= commit_ordered_queue; commit_ordered_queue= NULL; /* @@ -6131,7 +6730,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, next. */ commit_ordered_queue_busy= true; - pthread_mutex_unlock(&LOCK_prepare_ordered); + mysql_mutex_unlock(&LOCK_prepare_ordered); /* Reverse the queue list so we get correct order. */ commit_entry *prev= NULL; @@ -6154,9 +6753,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, /* Only run commit_ordered() if log_xid was successful. */ if (cookie) { - pthread_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_commit_ordered); run_commit_ordered(thd, all); - pthread_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); } if (need_prepare_ordered) @@ -6168,10 +6767,10 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, } else { - pthread_mutex_lock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_prepare_ordered); commit_ordered_queue_busy= false; - pthread_cond_signal(&COND_queue_busy); - pthread_mutex_unlock(&LOCK_prepare_ordered); + mysql_cond_signal(&COND_queue_busy); + mysql_mutex_unlock(&LOCK_prepare_ordered); } } } @@ -6242,17 +6841,18 @@ int TC_LOG_MMAP::open(const char *opt_name) DBUG_ASSERT(TC_LOG_PAGE_SIZE % tc_log_page_size == 0); fn_format(logname,opt_name,mysql_data_home,"",MY_UNPACK_FILENAME); - if ((fd= my_open(logname, O_RDWR, MYF(0))) < 0) + if ((fd= mysql_file_open(key_file_tclog, logname, O_RDWR, MYF(0))) < 0) { if (my_errno != ENOENT) goto err; if (using_heuristic_recover()) return 1; - if ((fd= my_create(logname, CREATE_MODE, O_RDWR, MYF(MY_WME))) < 0) + if ((fd= mysql_file_create(key_file_tclog, logname, CREATE_MODE, + O_RDWR, MYF(MY_WME))) < 0) goto err; inited=1; file_length= opt_tc_log_size; - if (my_chsize(fd, file_length, 0, MYF(MY_WME))) + if (mysql_file_chsize(fd, file_length, 0, MYF(MY_WME))) goto err; } else @@ -6266,7 +6866,7 @@ int TC_LOG_MMAP::open(const char *opt_name) "--tc-heuristic-recover is used"); goto err; } - file_length= my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE)); + file_length= mysql_file_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE)); if (file_length == MY_FILEPOS_ERROR || file_length % tc_log_page_size) goto err; } @@ -6290,9 +6890,9 @@ int TC_LOG_MMAP::open(const char *opt_name) { pg->next=pg+1; pg->waiters=0; - pg->state=POOL; - pthread_mutex_init(&pg->lock, MY_MUTEX_INIT_FAST); - pthread_cond_init (&pg->cond, 0); + pg->state=PS_POOL; + mysql_mutex_init(key_PAGE_lock, &pg->lock, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_PAGE_cond, &pg->cond, 0); pg->ptr= pg->start=(my_xid *)(data + i*tc_log_page_size); pg->size=pg->free=tc_log_page_size/sizeof(my_xid); pg->end=pg->start + pg->size; @@ -6311,12 +6911,12 @@ int TC_LOG_MMAP::open(const char *opt_name) my_msync(fd, data, tc_log_page_size, MS_SYNC); inited=5; - pthread_mutex_init(&LOCK_sync, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&LOCK_active, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST); - pthread_cond_init(&COND_active, 0); - pthread_cond_init(&COND_pool, 0); - pthread_cond_init(&COND_queue_busy, 0); + mysql_mutex_init(key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_active, &LOCK_active, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_active, &COND_active, 0); + mysql_cond_init(key_COND_pool, &COND_pool, 0); + mysql_cond_init(key_TC_LOG_MMAP_COND_queue_busy, &COND_queue_busy, 0); inited=6; @@ -6351,7 +6951,7 @@ void TC_LOG_MMAP::get_active_from_pool() PAGE **p, **best_p=0; int best_free; - pthread_mutex_lock(&LOCK_pool); + mysql_mutex_lock(&LOCK_pool); do { @@ -6371,16 +6971,16 @@ void TC_LOG_MMAP::get_active_from_pool() } while ((*best_p == 0 || best_free == 0) && overflow()); - safe_mutex_assert_owner(&LOCK_active); + mysql_mutex_assert_owner(&LOCK_active); active=*best_p; if ((*best_p)->next) // unlink the page from the pool *best_p=(*best_p)->next; else pool_last=*best_p; - pthread_mutex_unlock(&LOCK_pool); + mysql_mutex_unlock(&LOCK_pool); - pthread_mutex_lock(&active->lock); + mysql_mutex_lock(&active->lock); if (active->free == active->size) // we've chosen an empty page { tc_log_cur_pages_used++; @@ -6400,7 +7000,7 @@ int TC_LOG_MMAP::overflow() let's check the behaviour of tc_log_page_waits first */ tc_log_page_waits++; - pthread_cond_wait(&COND_pool, &LOCK_pool); + mysql_cond_wait(&COND_pool, &LOCK_pool); return 1; // always return 1 } @@ -6437,7 +7037,7 @@ int TC_LOG_MMAP::log_one_transaction(my_xid xid) PAGE *p; ulong cookie; - pthread_mutex_lock(&LOCK_active); + mysql_mutex_lock(&LOCK_active); /* if the active page is full - just wait... @@ -6447,13 +7047,13 @@ int TC_LOG_MMAP::log_one_transaction(my_xid xid) unlog() does not signal COND_active. */ while (unlikely(active && active->free == 0)) - pthread_cond_wait(&COND_active, &LOCK_active); + mysql_cond_wait(&COND_active, &LOCK_active); /* no active page ? take one from the pool */ if (active == 0) get_active_from_pool(); else - pthread_mutex_lock(&active->lock); + mysql_mutex_lock(&active->lock); p=active; @@ -6475,51 +7075,51 @@ int TC_LOG_MMAP::log_one_transaction(my_xid xid) cookie= (ulong)((uchar *)p->ptr - data); // can never be zero *p->ptr++= xid; p->free--; - p->state= DIRTY; - pthread_mutex_unlock(&p->lock); + p->state= PS_DIRTY; + mysql_mutex_unlock(&p->lock); - pthread_mutex_lock(&LOCK_sync); + mysql_mutex_lock(&LOCK_sync); if (syncing) { // somebody's syncing. let's wait - pthread_mutex_unlock(&LOCK_active); - pthread_mutex_lock(&p->lock); + mysql_mutex_unlock(&LOCK_active); + mysql_mutex_lock(&p->lock); p->waiters++; for (;;) { - int not_dirty = p->state != DIRTY; - pthread_mutex_unlock(&p->lock); + int not_dirty = p->state != PS_DIRTY; + mysql_mutex_unlock(&p->lock); if (not_dirty || !syncing) break; - pthread_cond_wait(&p->cond, &LOCK_sync); - pthread_mutex_lock(&p->lock); + mysql_cond_wait(&p->cond, &LOCK_sync); + mysql_mutex_lock(&p->lock); } p->waiters--; - err= p->state == ERROR; - if (p->state != DIRTY) // page was synced + err= p->state == PS_ERROR; + if (p->state != PS_DIRTY) // page was synced { - pthread_mutex_unlock(&LOCK_sync); + mysql_mutex_unlock(&LOCK_sync); if (p->waiters == 0) - pthread_cond_signal(&COND_pool); // in case somebody's waiting - pthread_mutex_unlock(&p->lock); + mysql_cond_signal(&COND_pool); // in case somebody's waiting + mysql_mutex_unlock(&p->lock); goto done; // we're done } DBUG_ASSERT(!syncing); - pthread_mutex_unlock(&p->lock); + mysql_mutex_unlock(&p->lock); syncing = p; - pthread_mutex_unlock(&LOCK_sync); + mysql_mutex_unlock(&LOCK_sync); - pthread_mutex_lock(&LOCK_active); + mysql_mutex_lock(&LOCK_active); active=0; // page is not active anymore - pthread_cond_broadcast(&COND_active); - pthread_mutex_unlock(&LOCK_active); + mysql_cond_broadcast(&COND_active); + mysql_mutex_unlock(&LOCK_active); } else { syncing = p; // place is vacant - take it - pthread_mutex_unlock(&LOCK_sync); + mysql_mutex_unlock(&LOCK_sync); active = 0; // page is not active anymore - pthread_cond_broadcast(&COND_active); - pthread_mutex_unlock(&LOCK_active); + mysql_cond_broadcast(&COND_active); + mysql_mutex_unlock(&LOCK_active); } err= sync(); @@ -6540,17 +7140,17 @@ int TC_LOG_MMAP::sync() err= my_msync(fd, syncing->start, syncing->size * sizeof(my_xid), MS_SYNC); /* page is synced. let's move it to the pool */ - pthread_mutex_lock(&LOCK_pool); + mysql_mutex_lock(&LOCK_pool); pool_last->next=syncing; pool_last=syncing; syncing->next=0; - syncing->state= err ? ERROR : POOL; - pthread_cond_signal(&COND_pool); // in case somebody's waiting - pthread_mutex_unlock(&LOCK_pool); + syncing->state= err ? PS_ERROR : PS_POOL; + mysql_cond_signal(&COND_pool); // in case somebody's waiting + mysql_mutex_unlock(&LOCK_pool); /* marking 'syncing' slot free */ - pthread_mutex_lock(&LOCK_sync); - pthread_cond_broadcast(&syncing->cond); // signal "sync done" + mysql_mutex_lock(&LOCK_sync); + mysql_cond_broadcast(&syncing->cond); // signal "sync done" syncing=0; /* we check the "active" pointer without LOCK_active. Still, it's safe - @@ -6561,8 +7161,8 @@ int TC_LOG_MMAP::sync() (the thread that will send a signal below) */ if (active) - pthread_cond_signal(&active->cond); // wake up a new syncer - pthread_mutex_unlock(&LOCK_sync); + mysql_cond_signal(&active->cond); // wake up a new syncer + mysql_mutex_unlock(&LOCK_sync); return err; } @@ -6579,7 +7179,7 @@ int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) DBUG_ASSERT(*x == xid); DBUG_ASSERT(x >= p->start && x < p->end); - pthread_mutex_lock(&p->lock); + mysql_mutex_lock(&p->lock); *x=0; p->free++; DBUG_ASSERT(p->free <= p->size); @@ -6587,8 +7187,8 @@ int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) if (p->free == p->size) // the page is completely empty statistic_decrement(tc_log_cur_pages_used, &LOCK_status); if (p->waiters == 0) // the page is in pool and ready to rock - pthread_cond_signal(&COND_pool); // ping ... for overflow() - pthread_mutex_unlock(&p->lock); + mysql_cond_signal(&COND_pool); // ping ... for overflow() + mysql_mutex_unlock(&p->lock); return 0; } @@ -6597,31 +7197,31 @@ void TC_LOG_MMAP::close() uint i; switch (inited) { case 6: - pthread_mutex_destroy(&LOCK_sync); - pthread_mutex_destroy(&LOCK_active); - pthread_mutex_destroy(&LOCK_pool); - pthread_cond_destroy(&COND_pool); - pthread_cond_destroy(&COND_active); - pthread_cond_destroy(&COND_queue_busy); + mysql_mutex_destroy(&LOCK_sync); + mysql_mutex_destroy(&LOCK_active); + mysql_mutex_destroy(&LOCK_pool); + mysql_cond_destroy(&COND_pool); + mysql_cond_destroy(&COND_active); + mysql_cond_destroy(&COND_queue_busy); case 5: - data[0]='A'; // garble the first (signature) byte, in case my_delete fails + data[0]='A'; // garble the first (signature) byte, in case mysql_file_delete fails case 4: for (i=0; i < npages; i++) { if (pages[i].ptr == 0) break; - pthread_mutex_destroy(&pages[i].lock); - pthread_cond_destroy(&pages[i].cond); + mysql_mutex_destroy(&pages[i].lock); + mysql_cond_destroy(&pages[i].cond); } case 3: - my_free((uchar*)pages, MYF(0)); + my_free(pages); case 2: my_munmap((char*)data, (size_t)file_length); case 1: - my_close(fd, MYF(0)); + mysql_file_close(fd, MYF(0)); } if (inited>=5) // cannot do in the switch because of Windows - my_delete(logname, MYF(MY_WME)); + mysql_file_delete(key_file_tclog, logname, MYF(MY_WME)); inited=0; } @@ -6649,8 +7249,8 @@ int TC_LOG_MMAP::recover() goto err1; } - if (hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0, - sizeof(my_xid), 0, 0, MYF(0))) + if (my_hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0, + sizeof(my_xid), 0, 0, MYF(0))) goto err1; for ( ; p < end_p ; p++) @@ -6663,12 +7263,12 @@ int TC_LOG_MMAP::recover() if (ha_recover(&xids)) goto err2; - hash_free(&xids); + my_hash_free(&xids); bzero(data, (size_t)file_length); return 0; err2: - hash_free(&xids); + my_hash_free(&xids); err1: sql_print_error("Crash recovery failed. Either correct the problem " "(if it's, for example, out of memory error) and restart, " @@ -6726,8 +7326,9 @@ int TC_LOG_BINLOG::open(const char *opt_name) DBUG_ASSERT(total_ha_2pc > 1); DBUG_ASSERT(opt_name && opt_name[0]); - pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST); - pthread_cond_init (&COND_prep_xids, 0); + mysql_mutex_init(key_BINLOG_LOCK_prep_xids, + &LOCK_prep_xids, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_BINLOG_COND_prep_xids, &COND_prep_xids, 0); if (!my_b_inited(&index_file)) { @@ -6794,7 +7395,7 @@ int TC_LOG_BINLOG::open(const char *opt_name) delete ev; end_io_cache(&log); - my_close(file, MYF(MY_WME)); + mysql_file_close(file, MYF(MY_WME)); if (error) goto err; @@ -6808,8 +7409,8 @@ err: void TC_LOG_BINLOG::close() { DBUG_ASSERT(prepared_xids==0); - pthread_mutex_destroy(&LOCK_prep_xids); - pthread_cond_destroy (&COND_prep_xids); + mysql_mutex_destroy(&LOCK_prep_xids); + mysql_cond_destroy(&COND_prep_xids); } /* @@ -6824,27 +7425,12 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, int err; DBUG_ENTER("TC_LOG_BINLOG::log_and_order"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + binlog_cache_mngr *cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - trx_data->using_xa= TRUE; - trx_data->xa_xid= xid; - if (xid) - { - Xid_log_event xid_event(thd, xid); - err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all); - } - else - { - /* - Empty xid occurs in XA COMMIT ... ONE PHASE. - In this case, we do not have a MySQL xid for the transaction, and the - external XA transaction coordinator will have to handle recovery if - needed. So we end the transaction with a plain COMMIT query event. - */ - Query_log_event end_event(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - err= binlog_flush_trx_cache(thd, trx_data, &end_event, all); - } + cache_mngr->using_xa= TRUE; + cache_mngr->xa_xid= xid; + err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid); DEBUG_SYNC(thd, "binlog_after_log_and_order"); @@ -6867,9 +7453,9 @@ TC_LOG_BINLOG::mark_xids_active(uint xid_count) { DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); DBUG_PRINT("info", ("xid_count=%u", xid_count)); - pthread_mutex_lock(&LOCK_prep_xids); + mysql_mutex_lock(&LOCK_prep_xids); prepared_xids+= xid_count; - pthread_mutex_unlock(&LOCK_prep_xids); + mysql_mutex_unlock(&LOCK_prep_xids); DBUG_VOID_RETURN; } @@ -6886,13 +7472,16 @@ TC_LOG_BINLOG::mark_xid_done() my_bool send_signal; DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); - pthread_mutex_lock(&LOCK_prep_xids); - DBUG_ASSERT(prepared_xids > 0); - send_signal= !--prepared_xids; - pthread_mutex_unlock(&LOCK_prep_xids); + mysql_mutex_lock(&LOCK_prep_xids); + // prepared_xids can be 0 if the transaction had ignorable errors. + DBUG_ASSERT(prepared_xids >= 0); + if (prepared_xids > 0) + prepared_xids--; + send_signal= (prepared_xids == 0); + mysql_mutex_unlock(&LOCK_prep_xids); if (send_signal) { DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); - pthread_cond_signal(&COND_prep_xids); + mysql_cond_signal(&COND_prep_xids); } DBUG_VOID_RETURN; } @@ -6913,8 +7502,8 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) MEM_ROOT mem_root; if (! fdle->is_valid() || - hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0, - sizeof(my_xid), 0, 0, MYF(0))) + my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0, + sizeof(my_xid), 0, 0, MYF(0))) goto err1; init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE); @@ -6940,12 +7529,12 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) goto err2; free_root(&mem_root, MYF(0)); - hash_free(&xids); + my_hash_free(&xids); return 0; err2: free_root(&mem_root, MYF(0)); - hash_free(&xids); + my_hash_free(&xids); err1: sql_print_error("Crash recovery failed. Either correct the problem " "(if it's, for example, out of memory error) and restart, " @@ -6987,12 +7576,12 @@ ulonglong mysql_bin_log_file_pos(void) void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file) { - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - if (trx_data) + binlog_cache_mngr *cache_mngr; + if (opt_bin_log && + (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton))) { - *out_file= trx_data->last_commit_pos_file; - *out_pos= (ulonglong)(trx_data->last_commit_pos_offset); + *out_file= cache_mngr->last_commit_pos_file; + *out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset); } else { @@ -7008,24 +7597,25 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, void *var_ptr, const void *save) { ulong value= *((ulong *)save); + bool check_purge= false; - pthread_mutex_lock(mysql_bin_log.get_log_lock()); + mysql_mutex_lock(mysql_bin_log.get_log_lock()); if(mysql_bin_log.is_open()) { - uint flags= RP_FORCE_ROTATE | RP_LOCK_LOG_IS_ALREADY_LOCKED | - (binlog_checksum_options != (uint) value? - RP_BINLOG_CHECKSUM_ALG_CHANGE : 0); - if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE) + if (binlog_checksum_options != value) mysql_bin_log.checksum_alg_reset= (uint8) value; - mysql_bin_log.rotate_and_purge(flags); + if (mysql_bin_log.rotate(true, &check_purge)) + check_purge= false; } else { binlog_checksum_options= value; } - DBUG_ASSERT((ulong) binlog_checksum_options == value); - DBUG_ASSERT(mysql_bin_log.checksum_alg_reset == BINLOG_CHECKSUM_ALG_UNDEF); - pthread_mutex_unlock(mysql_bin_log.get_log_lock()); + DBUG_ASSERT(binlog_checksum_options == value); + mysql_bin_log.checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); + if (check_purge) + mysql_bin_log.purge(); } @@ -7096,15 +7686,15 @@ set_binlog_snapshot_file(const char *src) void TC_LOG_BINLOG::set_status_variables(THD *thd) { - binlog_trx_data *trx_data; + binlog_cache_mngr *cache_mngr; - if (thd) - trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + if (thd && opt_bin_log) + cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); else - trx_data= NULL; + cache_mngr= 0; - bool have_snapshot= (trx_data && trx_data->last_commit_pos_file[0] != 0); - pthread_mutex_lock(&LOCK_commit_ordered); + bool have_snapshot= (cache_mngr && cache_mngr->last_commit_pos_file[0] != 0); + mysql_mutex_lock(&LOCK_commit_ordered); binlog_status_var_num_commits= this->num_commits; binlog_status_var_num_group_commits= this->num_group_commits; if (!have_snapshot) @@ -7112,34 +7702,18 @@ TC_LOG_BINLOG::set_status_variables(THD *thd) set_binlog_snapshot_file(last_commit_pos_file); binlog_snapshot_position= last_commit_pos_offset; } - pthread_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); if (have_snapshot) { - set_binlog_snapshot_file(trx_data->last_commit_pos_file); - binlog_snapshot_position= trx_data->last_commit_pos_offset; + set_binlog_snapshot_file(cache_mngr->last_commit_pos_file); + binlog_snapshot_position= cache_mngr->last_commit_pos_offset; } } struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; -mysql_declare_plugin(binlog) -{ - MYSQL_STORAGE_ENGINE_PLUGIN, - &binlog_storage_engine, - "binlog", - "MySQL AB", - "This is a pseudo storage engine to represent the binlog in a transaction", - PLUGIN_LICENSE_GPL, - binlog_init, /* Plugin Init */ - NULL, /* Plugin Deinit */ - 0x0100 /* 1.0 */, - binlog_status_vars_top, /* status variables */ - binlog_sys_vars, /* system variables */ - NULL /* config options */ -} -mysql_declare_plugin_end; maria_declare_plugin(binlog) { MYSQL_STORAGE_ENGINE_PLUGIN, |