summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc2800
1 files changed, 1687 insertions, 1113 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 53d81f22b66..9e9716dde67 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -1,4 +1,5 @@
/* Copyright (c) 2000, 2011, Oracle and/or its affiliates.
+ Copyright (c) 2010-2011 Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -24,25 +25,36 @@
Abort logging when we get an error in reading or writing log files
*/
-#include "mysql_priv.h"
+#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
+#include "sql_priv.h"
+#include "log.h"
+#include "sql_base.h" // open_log_table
#include "sql_repl.h"
+#include "sql_delete.h" // mysql_truncate
+#include "sql_parse.h" // command_name
+#include "sql_time.h" // calc_time_from_sec, my_time_compare
+#include "tztime.h" // my_tz_OFFSET0, struct Time_zone
+#include "sql_acl.h" // SUPER_ACL
+#include "log_event.h" // Query_log_event
#include "rpl_filter.h"
#include "rpl_rli.h"
+#include "sql_audit.h"
+#include "log_slow.h"
#include <my_dir.h>
#include <stdarg.h>
#include <m_ctype.h> // For test_if_number
-#ifdef __NT__
+#ifdef _WIN32
#include "message.h"
#endif
-#include <mysql/plugin.h>
+#include "sql_plugin.h"
+#include "rpl_handler.h"
#include "debug_sync.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
-#define MAX_USER_HOST_SIZE 512
#define MAX_TIME_SIZE 32
#define MY_OFF_T_UNDEF (~(my_off_t)0UL)
@@ -50,11 +62,10 @@
LOGGER logger;
-MYSQL_BIN_LOG mysql_bin_log;
-ulong sync_binlog_counter= 0;
+MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
static bool test_if_number(const char *str,
- long *res, bool allow_wildcards);
+ ulong *res, bool allow_wildcards);
static int binlog_init(void *p);
static int binlog_close_connection(handlerton *hton, THD *thd);
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv);
@@ -73,9 +84,8 @@ ulong binlog_checksum_options;
ulong opt_binlog_dbug_fsync_sleep= 0;
#endif
-static my_bool mutexes_inited;
-pthread_mutex_t LOCK_prepare_ordered;
-pthread_mutex_t LOCK_commit_ordered;
+mysql_mutex_t LOCK_prepare_ordered;
+mysql_mutex_t LOCK_commit_ordered;
static ulonglong binlog_status_var_num_commits;
static ulonglong binlog_status_var_num_group_commits;
@@ -97,6 +107,35 @@ static SHOW_VAR binlog_status_vars_detail[]=
/**
+ purge logs, master and slave sides both, related error code
+ convertor.
+ Called from @c purge_error_message(), @c MYSQL_BIN_LOG::reset_logs()
+
+ @param res an internal to purging routines error code
+
+ @return the user level error code ER_*
+*/
+uint purge_log_get_error_code(int res)
+{
+ uint errcode= 0;
+
+ switch (res) {
+ case 0: break;
+ case LOG_INFO_EOF: errcode= ER_UNKNOWN_TARGET_BINLOG; break;
+ case LOG_INFO_IO: errcode= ER_IO_ERR_LOG_INDEX_READ; break;
+ case LOG_INFO_INVALID:errcode= ER_BINLOG_PURGE_PROHIBITED; break;
+ case LOG_INFO_SEEK: errcode= ER_FSEEK_FAIL; break;
+ case LOG_INFO_MEM: errcode= ER_OUT_OF_RESOURCES; break;
+ case LOG_INFO_FATAL: errcode= ER_BINLOG_PURGE_FATAL_ERR; break;
+ case LOG_INFO_IN_USE: errcode= ER_LOG_IN_USE; break;
+ case LOG_INFO_EMFILE: errcode= ER_BINLOG_PURGE_EMFILE; break;
+ default: errcode= ER_LOG_PURGE_UNKNOWN_ERR; break;
+ }
+
+ return errcode;
+}
+
+/**
Silence all errors and warnings reported when performing a write
to a log table.
Errors and warnings are not reported to the client or SQL exception
@@ -114,23 +153,28 @@ public:
virtual ~Silence_log_table_errors() {}
- virtual bool handle_error(uint sql_errno, const char *message,
- MYSQL_ERROR::enum_warning_level level,
- THD *thd);
+ virtual bool handle_condition(THD *thd,
+ uint sql_errno,
+ const char* sql_state,
+ MYSQL_ERROR::enum_warning_level level,
+ const char* msg,
+ MYSQL_ERROR ** cond_hdl);
const char *message() const { return m_message; }
};
bool
-Silence_log_table_errors::handle_error(uint /* sql_errno */,
- const char *message_arg,
- MYSQL_ERROR::enum_warning_level /* level */,
- THD * /* thd */)
-{
- strmake(m_message, message_arg, sizeof(m_message)-1);
+Silence_log_table_errors::handle_condition(THD *,
+ uint,
+ const char*,
+ MYSQL_ERROR::enum_warning_level,
+ const char* msg,
+ MYSQL_ERROR ** cond_hdl)
+{
+ *cond_hdl= NULL;
+ strmake(m_message, msg, sizeof(m_message)-1);
return TRUE;
}
-
sql_print_message_func sql_print_message_handlers[3] =
{
sql_print_information,
@@ -139,149 +183,285 @@ sql_print_message_func sql_print_message_handlers[3] =
};
-char *make_default_log_name(char *buff,const char* log_ext)
-{
- strmake(buff, pidfile_name, FN_REFLEN-5);
- return fn_format(buff, buff, mysql_data_home, log_ext,
- MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
-}
-
-
-/*
- Create a filename from a base with a given suffix.
- The name is allocated trough my_once_alloc(), so one should only
- use this for startup options that can all be freed at once.
+/**
+ Create the name of the log file
+
+ @param[OUT] out a pointer to a new allocated name will go there
+ @param[IN] log_ext The extension for the file (e.g .log)
+ @param[IN] once whether to use malloc_once or a normal malloc.
*/
-
-char *make_once_alloced_filename(const char *basename, const char *ext)
+void make_default_log_name(char **out, const char* log_ext, bool once)
{
- char buff[FN_REFLEN+10], *end, *res;
- size_t length;
- strmake(buff, basename, sizeof(buff)-10);
- end= strmov(fn_ext(buff), ext);
- length= (size_t) (end - buff) + 1;
-
- if ((res= (char*) my_once_alloc(length, MYF(MY_WME))))
- memcpy(res, buff, length);
- return res;
+ char buff[FN_REFLEN+10];
+ fn_format(buff, opt_log_basename, "", log_ext, MYF(MY_REPLACE_EXT));
+ if (once)
+ *out= my_once_strdup(buff, MYF(MY_WME));
+ else
+ {
+ my_free(*out);
+ *out= my_strdup(buff, MYF(MY_WME));
+ }
}
/*
- Helper class to store binary log transaction data.
+ Helper classes to store non-transactional and transactional data
+ before copying it to the binary log.
*/
-class binlog_trx_data {
+class binlog_cache_data
+{
public:
- binlog_trx_data()
- : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
- before_stmt_pos(MY_OFF_T_UNDEF), last_commit_pos_offset(0),
- using_xa(FALSE), xa_xid(0)
+ binlog_cache_data(): m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF),
+ incident(FALSE), changes_to_non_trans_temp_table_flag(FALSE),
+ saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
+ ptr_binlog_cache_disk_use(0)
+ { }
+
+ ~binlog_cache_data()
{
- trans_log.end_of_file= max_binlog_cache_size;
- last_commit_pos_file[0]= 0;
+ DBUG_ASSERT(empty());
+ close_cached_file(&cache_log);
}
- ~binlog_trx_data()
+ bool empty() const
{
- DBUG_ASSERT(pending() == NULL);
- close_cached_file(&trans_log);
+ return pending() == NULL && my_b_tell(&cache_log) == 0;
}
- my_off_t position() const {
- return my_b_tell(&trans_log);
+ Rows_log_event *pending() const
+ {
+ return m_pending;
}
- bool empty() const
+ void set_pending(Rows_log_event *const pending)
{
- return pending() == NULL && my_b_tell(&trans_log) == 0;
+ m_pending= pending;
}
- /*
- Truncate the transaction cache to a certain position. This
- includes deleting the pending event.
- */
- void truncate(my_off_t pos)
+ void set_incident(void)
{
- DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
- DBUG_PRINT("info", ("before_stmt_pos=%lu", (ulong) pos));
- if (pending())
- {
- delete pending();
- }
- set_pending(0);
- reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
- trans_log.end_of_file= max_binlog_cache_size;
- if (pos < before_stmt_pos)
- before_stmt_pos= MY_OFF_T_UNDEF;
+ incident= TRUE;
+ }
+
+ bool has_incident(void)
+ {
+ return(incident);
+ }
- /*
- The only valid positions that can be truncated to are at the
- beginning of a statement. We are relying on this fact to be able
- to set the at_least_one_stmt_committed flag correctly. In other word, if
- we are truncating to the beginning of the transaction cache,
- there will be no statements in the cache, otherwhise, we will
- have at least one statement in the transaction cache.
- */
- at_least_one_stmt_committed= (pos > 0);
+ void set_changes_to_non_trans_temp_table()
+ {
+ changes_to_non_trans_temp_table_flag= TRUE;
}
- /*
- Reset the entire contents of the transaction cache, emptying it
- completely.
- */
- void reset() {
- if (trans_log.type != WRITE_CACHE || !empty())
- truncate(0);
- before_stmt_pos= MY_OFF_T_UNDEF;
+ bool changes_to_non_trans_temp_table()
+ {
+ return (changes_to_non_trans_temp_table_flag);
+ }
+
+ void reset()
+ {
+ compute_statistics();
+ truncate(0);
+ changes_to_non_trans_temp_table_flag= FALSE;
incident= FALSE;
- trans_log.end_of_file= max_binlog_cache_size;
- using_xa= FALSE;
- last_commit_pos_file[0]= 0;
- last_commit_pos_offset= 0;
+ before_stmt_pos= MY_OFF_T_UNDEF;
+ /*
+ The truncate function calls reinit_io_cache that calls my_b_flush_io_cache
+ which may increase disk_writes. This breaks the disk_writes use by the
+ binary log which aims to compute the ratio between in-memory cache usage
+ and disk cache usage. To avoid this undesirable behavior, we reset the
+ variable after truncating the cache.
+ */
+ cache_log.disk_writes= 0;
DBUG_ASSERT(empty());
}
- Rows_log_event *pending() const
+ my_off_t get_byte_position() const
{
- return m_pending;
+ return my_b_tell(&cache_log);
}
- void set_pending(Rows_log_event *const pending)
+ my_off_t get_prev_position()
{
- m_pending= pending;
+ return(before_stmt_pos);
}
- IO_CACHE trans_log; // The transaction cache
-
- void set_incident(void)
+ void set_prev_position(my_off_t pos)
{
- incident= TRUE;
+ before_stmt_pos= pos;
}
- bool has_incident(void)
+ void restore_prev_position()
{
- return(incident);
+ truncate(before_stmt_pos);
}
- /**
- Boolean that is true if there is at least one statement in the
- transaction cache.
+ void restore_savepoint(my_off_t pos)
+ {
+ truncate(pos);
+ if (pos < before_stmt_pos)
+ before_stmt_pos= MY_OFF_T_UNDEF;
+ }
+
+ void set_binlog_cache_info(ulong param_max_binlog_cache_size,
+ ulong *param_ptr_binlog_cache_use,
+ ulong *param_ptr_binlog_cache_disk_use)
+ {
+ /*
+ The assertions guarantee that the set_binlog_cache_info is
+ called just once and information passed as parameters are
+ never zero.
+
+ This is done while calling the constructor binlog_cache_mngr.
+ We cannot set informaton in the constructor binlog_cache_data
+ because the space for binlog_cache_mngr is allocated through
+ a placement new.
+
+ In the future, we can refactor this and change it to avoid
+ the set_binlog_info.
+ */
+ DBUG_ASSERT(saved_max_binlog_cache_size == 0 &&
+ param_max_binlog_cache_size != 0 &&
+ ptr_binlog_cache_use == 0 &&
+ param_ptr_binlog_cache_use != 0 &&
+ ptr_binlog_cache_disk_use == 0 &&
+ param_ptr_binlog_cache_disk_use != 0);
+
+ saved_max_binlog_cache_size= param_max_binlog_cache_size;
+ ptr_binlog_cache_use= param_ptr_binlog_cache_use;
+ ptr_binlog_cache_disk_use= param_ptr_binlog_cache_disk_use;
+ cache_log.end_of_file= saved_max_binlog_cache_size;
+ }
+
+ /*
+ Cache to store data before copying it to the binary log.
*/
- bool at_least_one_stmt_committed;
- bool incident;
+ IO_CACHE cache_log;
private:
/*
- Pending binrows event. This event is the event where the rows are
- currently written.
+ Pending binrows event. This event is the event where the rows are currently
+ written.
*/
Rows_log_event *m_pending;
-public:
/*
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
+
+ /*
+ This indicates that some events did not get into the cache and most likely
+ it is corrupted.
+ */
+ bool incident;
+
+ /*
+ This flag indicates if the cache has changes to temporary tables.
+ @TODO This a temporary fix and should be removed after BUG#54562.
+ */
+ bool changes_to_non_trans_temp_table_flag;
+
+ /**
+ This function computes binlog cache and disk usage.
+ */
+ void compute_statistics()
+ {
+ if (!empty())
+ {
+ statistic_increment(*ptr_binlog_cache_use, &LOCK_status);
+ if (cache_log.disk_writes != 0)
+ statistic_increment(*ptr_binlog_cache_disk_use, &LOCK_status);
+ }
+ }
+
+ /*
+ Stores the values of maximum size of the cache allowed when this cache
+ is configured. This corresponds to either
+ . max_binlog_cache_size or max_binlog_stmt_cache_size.
+ */
+ ulong saved_max_binlog_cache_size;
+
+ /*
+ Stores a pointer to the status variable that keeps track of the in-memory
+ cache usage. This corresponds to either
+ . binlog_cache_use or binlog_stmt_cache_use.
+ */
+ ulong *ptr_binlog_cache_use;
+
+ /*
+ Stores a pointer to the status variable that keeps track of the disk
+ cache usage. This corresponds to either
+ . binlog_cache_disk_use or binlog_stmt_cache_disk_use.
+ */
+ ulong *ptr_binlog_cache_disk_use;
+
+ /*
+ It truncates the cache to a certain position. This includes deleting the
+ pending event.
+ */
+ void truncate(my_off_t pos)
+ {
+ DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
+ if (pending())
+ {
+ delete pending();
+ set_pending(0);
+ }
+ reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, 0);
+ cache_log.end_of_file= saved_max_binlog_cache_size;
+ }
+
+ binlog_cache_data& operator=(const binlog_cache_data& info);
+ binlog_cache_data(const binlog_cache_data& info);
+};
+
+class binlog_cache_mngr {
+public:
+ binlog_cache_mngr(ulong param_max_binlog_stmt_cache_size,
+ ulong param_max_binlog_cache_size,
+ ulong *param_ptr_binlog_stmt_cache_use,
+ ulong *param_ptr_binlog_stmt_cache_disk_use,
+ ulong *param_ptr_binlog_cache_use,
+ ulong *param_ptr_binlog_cache_disk_use)
+ : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
+ {
+ stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
+ param_ptr_binlog_stmt_cache_use,
+ param_ptr_binlog_stmt_cache_disk_use);
+ trx_cache.set_binlog_cache_info(param_max_binlog_cache_size,
+ param_ptr_binlog_cache_use,
+ param_ptr_binlog_cache_disk_use);
+ last_commit_pos_file[0]= 0;
+ }
+
+ void reset(bool do_stmt, bool do_trx)
+ {
+ if (do_stmt)
+ stmt_cache.reset();
+ if (do_trx)
+ {
+ trx_cache.reset();
+ using_xa= FALSE;
+ last_commit_pos_file[0]= 0;
+ last_commit_pos_offset= 0;
+ }
+ }
+
+ binlog_cache_data* get_binlog_cache_data(bool is_transactional)
+ {
+ return (is_transactional ? &trx_cache : &stmt_cache);
+ }
+
+ IO_CACHE* get_binlog_cache_log(bool is_transactional)
+ {
+ return (is_transactional ? &trx_cache.cache_log : &stmt_cache.cache_log);
+ }
+
+ binlog_cache_data stmt_cache;
+
+ binlog_cache_data trx_cache;
+
/*
Binlog position for current transaction.
For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog
@@ -298,6 +478,11 @@ public:
*/
bool using_xa;
my_xid xa_xid;
+
+private:
+
+ binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
+ binlog_cache_mngr(const binlog_cache_mngr& info);
};
handlerton *binlog_hton;
@@ -317,8 +502,8 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
/* Check if a given table is opened log table */
-int check_if_log_table(uint db_len, const char *db, uint table_name_len,
- const char *table_name, uint check_if_opened)
+int check_if_log_table(size_t db_len, const char *db, size_t table_name_len,
+ const char *table_name, bool check_if_opened)
{
if (db_len == 5 &&
!(lower_case_table_names ?
@@ -410,7 +595,7 @@ bool Log_to_csv_event_handler::
bool need_rnd_end= FALSE;
uint field_index;
Silence_log_table_errors error_handler;
- Open_tables_state open_tables_backup;
+ Open_tables_backup open_tables_backup;
ulonglong save_thd_options;
bool save_time_zone_used;
DBUG_ENTER("log_general");
@@ -421,20 +606,16 @@ bool Log_to_csv_event_handler::
*/
save_time_zone_used= thd->time_zone_used;
- save_thd_options= thd->options;
- thd->options&= ~OPTION_BIN_LOG;
-
- bzero(& table_list, sizeof(TABLE_LIST));
- table_list.alias= table_list.table_name= GENERAL_LOG_NAME.str;
- table_list.table_name_length= GENERAL_LOG_NAME.length;
-
- table_list.lock_type= TL_WRITE_CONCURRENT_INSERT;
+ save_thd_options= thd->variables.option_bits;
+ thd->variables.option_bits&= ~OPTION_BIN_LOG;
- table_list.db= MYSQL_SCHEMA_NAME.str;
- table_list.db_length= MYSQL_SCHEMA_NAME.length;
+ table_list.init_one_table(MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length,
+ GENERAL_LOG_NAME.str, GENERAL_LOG_NAME.length,
+ GENERAL_LOG_NAME.str,
+ TL_WRITE_CONCURRENT_INSERT);
/*
- 1) open_performance_schema_table generates an error of the
+ 1) open_log_table generates an error of the
table can not be opened or is corrupted.
2) "INSERT INTO general_log" can generate warning sometimes.
@@ -447,8 +628,7 @@ bool Log_to_csv_event_handler::
thd->push_internal_handler(& error_handler);
need_pop= TRUE;
- if (!(table= open_performance_schema_table(thd, & table_list,
- & open_tables_backup)))
+ if (!(table= open_log_table(thd, &table_list, &open_tables_backup)))
goto err;
need_close= TRUE;
@@ -528,9 +708,9 @@ err:
if (need_pop)
thd->pop_internal_handler();
if (need_close)
- close_performance_schema_table(thd, & open_tables_backup);
+ close_log_table(thd, &open_tables_backup);
- thd->options= save_thd_options;
+ thd->variables.option_bits= save_thd_options;
thd->time_zone_used= save_time_zone_used;
DBUG_RETURN(result);
}
@@ -576,7 +756,7 @@ bool Log_to_csv_event_handler::
bool need_close= FALSE;
bool need_rnd_end= FALSE;
Silence_log_table_errors error_handler;
- Open_tables_state open_tables_backup;
+ Open_tables_backup open_tables_backup;
CHARSET_INFO *client_cs= thd->variables.character_set_client;
bool save_time_zone_used;
long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
@@ -593,17 +773,12 @@ bool Log_to_csv_event_handler::
*/
save_time_zone_used= thd->time_zone_used;
- bzero(& table_list, sizeof(TABLE_LIST));
- table_list.alias= table_list.table_name= SLOW_LOG_NAME.str;
- table_list.table_name_length= SLOW_LOG_NAME.length;
+ table_list.init_one_table(MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length,
+ SLOW_LOG_NAME.str, SLOW_LOG_NAME.length,
+ SLOW_LOG_NAME.str,
+ TL_WRITE_CONCURRENT_INSERT);
- table_list.lock_type= TL_WRITE_CONCURRENT_INSERT;
-
- table_list.db= MYSQL_SCHEMA_NAME.str;
- table_list.db_length= MYSQL_SCHEMA_NAME.length;
-
- if (!(table= open_performance_schema_table(thd, & table_list,
- & open_tables_backup)))
+ if (!(table= open_log_table(thd, &table_list, &open_tables_backup)))
goto err;
need_close= TRUE;
@@ -717,7 +892,7 @@ err:
table->file->ha_release_auto_increment();
}
if (need_close)
- close_performance_schema_table(thd, & open_tables_backup);
+ close_log_table(thd, &open_tables_backup);
thd->time_zone_used= save_time_zone_used;
DBUG_RETURN(result);
}
@@ -727,36 +902,31 @@ int Log_to_csv_event_handler::
{
TABLE_LIST table_list;
TABLE *table;
+ LEX_STRING *UNINIT_VAR(log_name);
int result;
- Open_tables_state open_tables_backup;
+ Open_tables_backup open_tables_backup;
DBUG_ENTER("Log_to_csv_event_handler::activate_log");
- bzero(& table_list, sizeof(TABLE_LIST));
-
if (log_table_type == QUERY_LOG_GENERAL)
{
- table_list.alias= table_list.table_name= GENERAL_LOG_NAME.str;
- table_list.table_name_length= GENERAL_LOG_NAME.length;
+ log_name= &GENERAL_LOG_NAME;
}
else
{
DBUG_ASSERT(log_table_type == QUERY_LOG_SLOW);
- table_list.alias= table_list.table_name= SLOW_LOG_NAME.str;
- table_list.table_name_length= SLOW_LOG_NAME.length;
- }
-
- table_list.lock_type= TL_WRITE_CONCURRENT_INSERT;
- table_list.db= MYSQL_SCHEMA_NAME.str;
- table_list.db_length= MYSQL_SCHEMA_NAME.length;
+ log_name= &SLOW_LOG_NAME;
+ }
+ table_list.init_one_table(MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length,
+ log_name->str, log_name->length, log_name->str,
+ TL_WRITE_CONCURRENT_INSERT);
- table= open_performance_schema_table(thd, & table_list,
- & open_tables_backup);
+ table= open_log_table(thd, &table_list, &open_tables_backup);
if (table)
{
result= 0;
- close_performance_schema_table(thd, & open_tables_backup);
+ close_log_table(thd, &open_tables_backup);
}
else
result= 1;
@@ -833,10 +1003,10 @@ bool Log_to_file_event_handler::init()
if (!is_initialized)
{
if (opt_slow_log)
- mysql_slow_log.open_slow_log(sys_var_slow_log_path.value);
+ mysql_slow_log.open_slow_log(opt_slow_logname);
if (opt_log)
- mysql_log.open_query_log(sys_var_general_log_path.value);
+ mysql_log.open_query_log(opt_logname);
is_initialized= TRUE;
}
@@ -860,13 +1030,6 @@ void Log_to_file_event_handler::flush()
mysql_slow_log.reopen_file();
}
-void Log_to_file_event_handler::flush_slow_log()
-{
- /* reopen slow log file */
- if (opt_slow_log)
- mysql_slow_log.reopen_file();
-}
-
/*
Log error with all enabled log event handlers
@@ -900,7 +1063,7 @@ bool LOGGER::error_log_print(enum loglevel level, const char *format,
void LOGGER::cleanup_base()
{
DBUG_ASSERT(inited == 1);
- rwlock_destroy(&LOCK_logger);
+ mysql_rwlock_destroy(&LOCK_logger);
if (table_log_handler)
{
table_log_handler->cleanup();
@@ -945,7 +1108,7 @@ void LOGGER::init_base()
init_error_log(LOG_FILE);
file_log_handler->init_pthread_objects();
- my_rwlock_init(&LOCK_logger, NULL);
+ mysql_rwlock_init(key_rwlock_LOCK_logger, &LOCK_logger);
}
@@ -977,7 +1140,12 @@ bool LOGGER::flush_logs(THD *thd)
}
-bool LOGGER::flush_slow_log(THD *thd)
+/**
+ Close and reopen the slow log (with locks).
+
+ @returns FALSE.
+*/
+bool LOGGER::flush_slow_log()
{
/*
Now we lock logger, as nobody should be able to use logging routines while
@@ -985,11 +1153,37 @@ bool LOGGER::flush_slow_log(THD *thd)
*/
logger.lock_exclusive();
- /* reopen log files */
- file_log_handler->flush_slow_log();
+ /* Reopen slow log file */
+ if (opt_slow_log)
+ file_log_handler->get_mysql_slow_log()->reopen_file();
- /* end of log flush */
+ /* End of log flush */
+ logger.unlock();
+
+ return 0;
+}
+
+
+/**
+ Close and reopen the general log (with locks).
+
+ @returns FALSE.
+*/
+bool LOGGER::flush_general_log()
+{
+ /*
+ Now we lock logger, as nobody should be able to use logging routines while
+ log tables are closed
+ */
+ logger.lock_exclusive();
+
+ /* Reopen general log file */
+ if (opt_log)
+ file_log_handler->get_mysql_log()->reopen_file();
+
+ /* End of log flush */
logger.unlock();
+
return 0;
}
@@ -1079,7 +1273,6 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command,
bool error= FALSE;
Log_event_handler **current_handler= general_log_handler_list;
char user_host_buff[MAX_USER_HOST_SIZE + 1];
- Security_context *sctx= thd->security_ctx;
uint user_host_len= 0;
my_hrtime_t current_time;
@@ -1091,14 +1284,16 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command,
unlock();
return 0;
}
- user_host_len= strxnmov(user_host_buff, MAX_USER_HOST_SIZE,
- sctx->priv_user ? sctx->priv_user : "", "[",
- sctx->user ? sctx->user : "", "] @ ",
- sctx->host ? sctx->host : "", " [",
- sctx->ip ? sctx->ip : "", "]", NullS) -
- user_host_buff;
+ user_host_len= make_user_name(thd, user_host_buff);
current_time= my_hrtime();
+
+ mysql_audit_general_log(thd, hrtime_to_time(current_time),
+ user_host_buff, user_host_len,
+ command_name[(uint) command].str,
+ command_name[(uint) command].length,
+ query, query_length);
+
while (*current_handler)
error|= (*current_handler++)->
log_general(thd, current_time, user_host_buff,
@@ -1128,7 +1323,7 @@ bool LOGGER::general_log_print(THD *thd, enum enum_server_command command,
return general_log_write(thd, command, message_buff, message_buff_len);
}
-void LOGGER::init_error_log(uint error_log_printer)
+void LOGGER::init_error_log(ulonglong error_log_printer)
{
if (error_log_printer & LOG_NONE)
{
@@ -1151,7 +1346,7 @@ void LOGGER::init_error_log(uint error_log_printer)
}
}
-void LOGGER::init_slow_log(uint slow_log_printer)
+void LOGGER::init_slow_log(ulonglong slow_log_printer)
{
if (slow_log_printer & LOG_NONE)
{
@@ -1176,7 +1371,7 @@ void LOGGER::init_slow_log(uint slow_log_printer)
}
}
-void LOGGER::init_general_log(uint general_log_printer)
+void LOGGER::init_general_log(ulonglong general_log_printer)
{
if (general_log_printer & LOG_NONE)
{
@@ -1213,7 +1408,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type)
{
file_log= file_log_handler->get_mysql_slow_log();
- file_log->open_slow_log(sys_var_slow_log_path.value);
+ file_log->open_slow_log(opt_slow_logname);
if (table_log_handler->activate_log(thd, QUERY_LOG_SLOW))
{
/* Error printed by open table in activate_log() */
@@ -1232,7 +1427,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type)
{
file_log= file_log_handler->get_mysql_log();
- file_log->open_query_log(sys_var_general_log_path.value);
+ file_log->open_query_log(opt_logname);
if (table_log_handler->activate_log(thd, QUERY_LOG_GENERAL))
{
/* Error printed by open table in activate_log() */
@@ -1289,9 +1484,9 @@ bool Log_to_csv_event_handler::init()
return 0;
}
-int LOGGER::set_handlers(uint error_log_printer,
- uint slow_log_printer,
- uint general_log_printer)
+int LOGGER::set_handlers(ulonglong error_log_printer,
+ ulonglong slow_log_printer,
+ ulonglong general_log_printer)
{
/* error log table is not supported yet */
DBUG_ASSERT(error_log_printer < LOG_TABLE);
@@ -1317,26 +1512,6 @@ int LOGGER::set_handlers(uint error_log_printer,
return 0;
}
-/**
- This function checks if a transactional talbe was updated by the
- current statement.
-
- @param thd The client thread that executed the current statement.
- @return
- @c true if a transactional table was updated, @false otherwise.
-*/
-static bool stmt_has_updated_trans_table(THD *thd)
-{
- Ha_trx_info *ha_info;
-
- for (ha_info= thd->transaction.stmt.ha_list; ha_info && ha_info->is_started(); ha_info= ha_info->next())
- {
- if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
- return (TRUE);
- }
- return (FALSE);
-}
-
/*
Save position of binary log transaction cache.
@@ -1359,10 +1534,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
DBUG_ASSERT(pos != NULL);
if (thd_get_ha_data(thd, binlog_hton) == NULL)
thd->binlog_setup_trx_data();
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
DBUG_ASSERT(mysql_bin_log.is_open());
- *pos= trx_data->position();
+ *pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
}
@@ -1393,9 +1568,9 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
/* Only true if binlog_trans_log_savepos() wasn't called before */
DBUG_ASSERT(pos != ~(my_off_t) 0);
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- trx_data->truncate(pos);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ cache_mngr->trx_cache.restore_savepoint(pos);
DBUG_VOID_RETURN;
}
@@ -1425,130 +1600,214 @@ int binlog_init(void *p)
static int binlog_close_connection(handlerton *hton, THD *thd)
{
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- DBUG_ASSERT(trx_data->empty());
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty());
thd_set_ha_data(thd, binlog_hton, NULL);
- trx_data->~binlog_trx_data();
- my_free((uchar*)trx_data, MYF(0));
+ cache_mngr->~binlog_cache_mngr();
+ my_free(cache_mngr);
return 0;
}
/*
- End a transaction, writing events to the binary log.
+ This function flushes a cache upon commit/rollback.
SYNOPSIS
- binlog_flush_trx_cache()
+ binlog_flush_cache()
- thd The thread whose transaction should be ended
- trx_data Pointer to the transaction data to use
- all True if the entire transaction should be ended, false if
- only the statement transaction should be ended.
- end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
+ thd The thread whose transaction should be ended
+ cache_mngr Pointer to the binlog_cache_mngr to use
+ all True if the entire transaction should be ended, false if
+ only the statement transaction should be ended.
+ end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
+ using_stmt True if the statement cache should be flushed
+ using_trx True if the transaction cache should be flushed
DESCRIPTION
- End the currently open transaction. The transaction can be either
+ End the currently transaction or statement. The transaction can be either
a real transaction or a statement transaction.
This can be to commit a transaction, with a COMMIT query event or an XA
commit XID event. But it can also be to rollback a transaction with a
ROLLBACK query event, used for rolling back transactions which also
- contain updates to non-transactional tables.
+ contain updates to non-transactional tables. Or it can be a flush of
+ a statement cache.
*/
static int
-binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
- Log_event *end_ev, bool all)
+binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
+ Log_event *end_ev, bool all, bool using_stmt,
+ bool using_trx)
{
- DBUG_ENTER("binlog_flush_trx_cache");
- IO_CACHE *trans_log= &trx_data->trans_log;
- DBUG_PRINT("info", ("thd->options={ %s%s}",
- FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
- FLAGSTR(thd->options, OPTION_BEGIN)));
-
- if (thd->binlog_flush_pending_rows_event(TRUE))
- DBUG_RETURN(1);
-
- /*
- Doing a commit or a rollback including non-transactional tables,
- i.e., ending a transaction where we might write the transaction
- cache to the binary log.
-
- We can always end the statement when ending a transaction since
- transactions are not allowed inside stored functions. If they
- were, we would have to ensure that we're not ending a statement
- inside a stored function.
- */
- int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data,
- end_ev, all);
-
- trx_data->reset();
+ int error= 0;
+ DBUG_ENTER("binlog_flush_cache");
- statistic_increment(binlog_cache_use, &LOCK_status);
- if (trans_log->disk_writes != 0)
+ if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
+ (using_trx && !cache_mngr->trx_cache.empty()))
{
- statistic_increment(binlog_cache_disk_use, &LOCK_status);
- trans_log->disk_writes= 0;
+ if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE))
+ DBUG_RETURN(1);
+ if (using_trx && thd->binlog_flush_pending_rows_event(TRUE, TRUE))
+ DBUG_RETURN(1);
+
+ /*
+ Doing a commit or a rollback including non-transactional tables,
+ i.e., ending a transaction where we might write the transaction
+ cache to the binary log.
+
+ We can always end the statement when ending a transaction since
+ transactions are not allowed inside stored functions. If they
+ were, we would have to ensure that we're not ending a statement
+ inside a stored function.
+ */
+ error= mysql_bin_log.write_transaction_to_binlog(thd, cache_mngr,
+ end_ev, all,
+ using_stmt, using_trx);
}
+ cache_mngr->reset(using_stmt, using_trx);
- DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
+ DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) &&
+ (!using_trx || cache_mngr->trx_cache.empty()));
DBUG_RETURN(error);
}
-/*
- Discard a transaction, ie. ROLLBACK with only transactional table updates.
- SYNOPSIS
- binlog_truncate_trx_cache()
+/**
+ This function flushes the stmt-cache upon commit.
- thd The thread whose transaction should be ended
- trx_data Pointer to the transaction data to use
- all True if the entire transaction should be ended, false if
- only the statement transaction should be ended.
+ @param thd The thread whose transaction should be flushed
+ @param cache_mngr Pointer to the cache manager
- DESCRIPTION
+ @return
+ nonzero if an error pops up when flushing the cache.
+*/
+static inline int
+binlog_commit_flush_stmt_cache(THD *thd, bool all,
+ binlog_cache_mngr *cache_mngr)
+{
+ Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
+ FALSE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
+}
- Rollback (and end) a transaction that only modifies transactional
- tables. The transaction can be either a real transaction (if 'all' is
- true) or a statement transaction (if 'all' is false).
+/**
+ This function flushes the trx-cache upon commit.
- The transaction cache will be truncated to either just before the last
- opened statement transaction (if 'all' is false), or reset completely (if
- 'all' is true).
- */
+ @param thd The thread whose transaction should be flushed
+ @param cache_mngr Pointer to the cache manager
+
+ @return
+ nonzero if an error pops up when flushing the cache.
+*/
+static inline int
+binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr)
+{
+ Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
+}
+
+/**
+ This function flushes the trx-cache upon rollback.
+
+ @param thd The thread whose transaction should be flushed
+ @param cache_mngr Pointer to the cache manager
+
+ @return
+ nonzero if an error pops up when flushing the cache.
+*/
+static inline int
+binlog_rollback_flush_trx_cache(THD *thd, bool all,
+ binlog_cache_mngr *cache_mngr)
+{
+ Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"),
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
+}
+
+/**
+ This function flushes the trx-cache upon commit.
+
+ @param thd The thread whose transaction should be flushed
+ @param cache_mngr Pointer to the cache manager
+ @param xid Transaction Id
+
+ @return
+ nonzero if an error pops up when flushing the cache.
+*/
+static inline int
+binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
+ bool all, my_xid xid)
+{
+ if (xid)
+ {
+ Xid_log_event end_evt(thd, xid, TRUE);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
+ }
+ else
+ {
+ /*
+ Empty xid occurs in XA COMMIT ... ONE PHASE.
+ In this case, we do not have a MySQL xid for the transaction, and the
+ external XA transaction coordinator will have to handle recovery if
+ needed. So we end the transaction with a plain COMMIT query event.
+ */
+ Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
+ }
+}
+
+/**
+ This function truncates the transactional cache upon committing or rolling
+ back either a transaction or a statement.
+
+ @param thd The thread whose transaction should be flushed
+ @param cache_mngr Pointer to the cache data to be flushed
+ @param all @c true means truncate the transaction, otherwise the
+ statement must be truncated.
+
+ @return
+ nonzero if an error pops up when truncating the transactional cache.
+*/
static int
-binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all)
+binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
{
DBUG_ENTER("binlog_truncate_trx_cache");
- int error= 0;
- DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt"));
- DBUG_PRINT("info", ("thd->options={ %s%s}",
- FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
- FLAGSTR(thd->options, OPTION_BEGIN)));
-
+ int error=0;
/*
- ROLLBACK with nothing to replicate: i.e., rollback of only transactional
- tables.
+ This function handles transactional changes and as such this flag
+ equals to true.
*/
+ bool const is_transactional= TRUE;
+
+ DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s",
+ FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
+ FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
+ all ? "all" : "stmt"));
+ thd->binlog_remove_pending_rows_event(TRUE, is_transactional);
/*
If rolling back an entire transaction or a single statement not
inside a transaction, we reset the transaction cache.
-
- If rolling back a statement in a transaction, we truncate the
- transaction cache to remove the statement.
- */
- thd->binlog_remove_pending_rows_event(TRUE);
- if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
+ */
+ if (ending_trans(thd, all))
{
- if (trx_data->has_incident())
+ if (cache_mngr->trx_cache.has_incident())
error= mysql_bin_log.write_incident(thd);
- trx_data->reset();
+
+ thd->clear_binlog_table_maps();
+
+ cache_mngr->reset(false, true);
}
- else // ...statement
- trx_data->truncate(trx_data->before_stmt_pos);
+ /*
+ If rolling back a statement in a transaction, we truncate the
+ transaction cache to remove the statement.
+ */
+ else
+ cache_mngr->trx_cache.restore_prev_position();
- DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
+ DBUG_ASSERT(thd->binlog_get_pending_rows_event(is_transactional) == NULL);
DBUG_RETURN(error);
}
@@ -1566,8 +1825,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
/**
This function is called once after each statement.
- It has the responsibility to flush the transaction cache to the
- binlog file on commits.
+ It has the responsibility to flush the caches to the binary log on commits.
@param hton The binlog handlerton.
@param thd The client thread that executes the transaction.
@@ -1580,53 +1838,50 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
{
int error= 0;
DBUG_ENTER("binlog_commit");
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
-
- if (trx_data->empty())
- {
- // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
- trx_data->reset();
- DBUG_RETURN(0);
- }
-
- /*
- We flush the cache if:
-
- - we are committing a transaction or;
- - no statement was committed before and just non-transactional
- tables were updated.
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- Otherwise, we collect the changes.
- */
DBUG_PRINT("debug",
- ("all: %d, empty: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
+ ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
all,
- YESNO(trx_data->empty()),
+ YESNO(thd->in_multi_stmt_transaction_mode()),
YESNO(thd->transaction.all.modified_non_trans_table),
YESNO(thd->transaction.stmt.modified_non_trans_table)));
- if (ending_trans(thd, all) ||
- (trans_has_no_stmt_committed(thd, all) &&
- !stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd)))
+
+ if (!cache_mngr->stmt_cache.empty())
+ {
+ error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
+ }
+
+ if (cache_mngr->trx_cache.empty())
{
- Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
- error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
+ /*
+ we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
+ */
+ cache_mngr->reset(false, true);
+ DBUG_RETURN(error);
}
- trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0;
+ /*
+ We commit the transaction if:
+ - We are not in a transaction and committing a statement, or
+ - We are in a transaction and a full transaction is committed.
+ Otherwise, we accumulate the changes.
+ */
+ if (!error && ending_trans(thd, all))
+ error= binlog_commit_flush_trx_cache(thd, all, cache_mngr);
+ /*
+ This is part of the stmt rollback.
+ */
if (!all)
- trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt commit
+ cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+
DBUG_RETURN(error);
}
/**
- This function is called when a transaction involving a transactional
- table is rolled back.
-
- It has the responsibility to flush the transaction cache to the
- binlog file. However, if the transaction does not involve
- non-transactional tables, nothing needs to be logged.
+ This function is called when a transaction or a statement is rolled back.
@param hton The binlog handlerton.
@param thd The client thread that executes the transaction.
@@ -1638,19 +1893,38 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
static int binlog_rollback(handlerton *hton, THD *thd, bool all)
{
DBUG_ENTER("binlog_rollback");
- int error=0;
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
-
- if (trx_data->empty()) {
- trx_data->reset();
- DBUG_RETURN(0);
- }
+ int error= 0;
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
YESNO(all),
YESNO(thd->transaction.all.modified_non_trans_table),
YESNO(thd->transaction.stmt.modified_non_trans_table)));
+
+ /*
+ If an incident event is set we do not flush the content of the statement
+ cache because it may be corrupted.
+ */
+ if (cache_mngr->stmt_cache.has_incident())
+ {
+ error= mysql_bin_log.write_incident(thd);
+ cache_mngr->reset(true, false);
+ }
+ else if (!cache_mngr->stmt_cache.empty())
+ {
+ error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
+ }
+
+ if (cache_mngr->trx_cache.empty())
+ {
+ /*
+ we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
+ */
+ cache_mngr->reset(false, true);
+ DBUG_RETURN(error);
+ }
+
if (mysql_bin_log.check_write_error(thd))
{
/*
@@ -1661,68 +1935,61 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
*/
DBUG_ASSERT(!all);
/*
- We reach this point if either only transactional tables were modified or
- the effect of a statement that did not get into the binlog needs to be
- rolled back. In the latter case, if a statement changed non-transactional
- tables or had the OPTION_KEEP_LOG associated, we write an incident event
- to the binlog in order to stop slaves and notify users that some changes
- on the master did not get into the binlog and slaves will be inconsistent.
- On the other hand, if a statement is transactional, we just safely roll it
- back.
+ We reach this point if the effect of a statement did not properly get into
+ a cache and need to be rolled back.
*/
- if ((stmt_has_updated_non_trans_table(thd) ||
- (thd->options & OPTION_KEEP_LOG)) &&
- mysql_bin_log.check_write_error(thd))
- trx_data->set_incident();
- error= binlog_truncate_trx_cache(thd, trx_data, all);
+ error |= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
- else
- {
- /*
- We flush the cache with a rollback, wrapped in a begin/rollback if:
- . aborting a transaction that modified a non-transactional table or
- the OPTION_KEEP_LOG is activate.
- . aborting a statement that modified both transactional and
- non-transactional tables but which is not in the boundaries of any
- transaction or there was no early change;
+ else if (!error)
+ {
+ /*
+ We flush the cache wrapped in a beging/rollback if:
+ . aborting a single or multi-statement transaction and;
+ . the OPTION_KEEP_LOG is active or;
+ . the format is STMT and a non-trans table was updated or;
+ . the format is MIXED and a temporary non-trans table was
+ updated or;
+ . the format is MIXED, non-trans table was updated and
+ aborting a single statement transaction;
*/
- if ((ending_trans(thd, all) &&
- (trans_has_updated_non_trans_table(thd) ||
- (thd->options & OPTION_KEEP_LOG))) ||
- (trans_has_no_stmt_committed(thd, all) &&
- stmt_has_updated_non_trans_table(thd) &&
- thd->current_stmt_binlog_row_based))
- {
- Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
- error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
- }
+ if (ending_trans(thd, all) &&
+ ((thd->variables.option_bits & OPTION_KEEP_LOG) ||
+ (trans_has_updated_non_trans_table(thd) &&
+ thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
+ (cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
+ thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
+ (trans_has_updated_non_trans_table(thd) &&
+ ending_single_stmt_trans(thd,all) &&
+ thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
+ error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
- Otherwise, we simply truncate the cache as there is no change on
- non-transactional tables as follows.
+ Truncate the cache if:
+ . aborting a single or multi-statement transaction or;
+ . the OPTION_KEEP_LOG is not active and;
+ . the format is not STMT or no non-trans table was
+ updated and;
+ . the format is not MIXED or no temporary non-trans table
+ was updated.
*/
else if (ending_trans(thd, all) ||
- (!(thd->options & OPTION_KEEP_LOG) && !stmt_has_updated_non_trans_table(thd)))
- error= binlog_truncate_trx_cache(thd, trx_data, all);
+ (!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
+ (!stmt_has_updated_non_trans_table(thd) ||
+ thd->variables.binlog_format != BINLOG_FORMAT_STMT) &&
+ (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
+ thd->variables.binlog_format != BINLOG_FORMAT_MIXED)))
+ error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
- if (!all)
- trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback
- DBUG_RETURN(error);
-}
-/**
- Cleanup the cache.
-
- @param thd The client thread that wants to clean up the cache.
-*/
-void MYSQL_BIN_LOG::reset_gathered_updates(THD *thd)
-{
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ /*
+ This is part of the stmt rollback.
+ */
+ if (!all)
+ cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
- trx_data->reset();
+ DBUG_RETURN(error);
}
-void MYSQL_BIN_LOG::set_write_error(THD *thd)
+void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::set_write_error");
@@ -1732,9 +1999,20 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd)
DBUG_VOID_RETURN;
if (my_errno == EFBIG)
- my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME));
+ {
+ if (is_transactional)
+ {
+ my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME));
+ }
+ else
+ {
+ my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME));
+ }
+ }
else
+ {
my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name, errno);
+ }
DBUG_VOID_RETURN;
}
@@ -1748,9 +2026,10 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd)
if (!thd->is_error())
DBUG_RETURN(checked);
- switch (thd->main_da.sql_errno())
+ switch (thd->stmt_da->sql_errno())
{
case ER_TRANS_CACHE_FULL:
+ case ER_STMT_CACHE_FULL:
case ER_ERROR_ON_WRITE:
case ER_BINLOG_LOGGING_IMPOSSIBLE:
checked= TRUE;
@@ -1800,7 +2079,7 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
- TRUE, TRUE, errcode);
+ TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
@@ -1813,8 +2092,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
- if (unlikely(trans_has_updated_non_trans_table(thd) ||
- (thd->options & OPTION_KEEP_LOG)))
+ if (unlikely(trans_has_updated_non_trans_table(thd) ||
+ (thd->variables.option_bits & OPTION_KEEP_LOG)))
{
String log_query;
if (log_query.append(STRING_WITH_LEN("ROLLBACK TO ")) ||
@@ -1824,7 +2103,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
- TRUE, TRUE, errcode);
+ TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
binlog_trans_log_truncate(thd, *(my_off_t*)sv);
@@ -1858,8 +2137,9 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg)
File file;
DBUG_ENTER("open_binlog");
- if ((file = my_open(log_file_name, O_RDONLY | O_BINARY | O_SHARE,
- MYF(MY_WME))) < 0)
+ if ((file= mysql_file_open(key_file_binlog,
+ log_file_name, O_RDONLY | O_BINARY | O_SHARE,
+ MYF(MY_WME))) < 0)
{
sql_print_error("Failed to open log (file '%s', errno %d)",
log_file_name, my_errno);
@@ -1881,13 +2161,13 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg)
err:
if (file >= 0)
{
- my_close(file,MYF(0));
+ mysql_file_close(file, MYF(0));
end_io_cache(log);
}
DBUG_RETURN(-1);
}
-#ifdef __NT__
+#ifdef _WIN32
static int eventSource = 0;
static void setup_windows_event_source()
@@ -1922,28 +2202,33 @@ static void setup_windows_event_source()
RegCloseKey(hRegKey);
}
-#endif /* __NT__ */
+#endif /* _WIN32 */
/**
Find a unique filename for 'filename.#'.
- Set '#' to a number as low as possible.
+ Set '#' to the number next to the maximum found in the most
+ recent log file extension.
+
+ This function will return nonzero if: (i) the generated name
+ exceeds FN_REFLEN; (ii) if the number of extensions is exhausted;
+ or (iii) some other error happened while examining the filesystem.
@return
- nonzero if not possible to get unique filename
+ nonzero if not possible to get unique filename.
*/
static int find_uniq_filename(char *name)
{
- long number;
uint i;
- char buff[FN_REFLEN];
+ char buff[FN_REFLEN], ext_buf[FN_REFLEN];
struct st_my_dir *dir_info;
reg1 struct fileinfo *file_info;
- ulong max_found=0;
+ ulong max_found= 0, next= 0, number= 0;
size_t buf_length, length;
char *start, *end;
+ int error= 0;
DBUG_ENTER("find_uniq_filename");
LINT_INIT(number);
@@ -1952,16 +2237,16 @@ static int find_uniq_filename(char *name)
end= strend(start);
*end='.';
- length= (size_t) (end-start+1);
+ length= (size_t) (end - start + 1);
if ((DBUG_EVALUATE_IF("error_unique_log_filename", 1,
- !(dir_info = my_dir(buff,MYF(MY_DONT_SORT))))))
+ !(dir_info= my_dir(buff,MYF(MY_DONT_SORT))))))
{ // This shouldn't happen
strmov(end,".1"); // use name+1
DBUG_RETURN(1);
}
file_info= dir_info->dir_entry;
- for (i=dir_info->number_off_files ; i-- ; file_info++)
+ for (i= dir_info->number_off_files ; i-- ; file_info++)
{
if (memcmp(file_info->name, start, length) == 0 &&
test_if_number(file_info->name+length, &number,0))
@@ -1971,8 +2256,52 @@ static int find_uniq_filename(char *name)
}
my_dirend(dir_info);
+ /* check if reached the maximum possible extension number */
+ if ((max_found == MAX_LOG_UNIQUE_FN_EXT))
+ {
+ sql_print_error("Log filename extension number exhausted: %06lu. \
+Please fix this by archiving old logs and \
+updating the index files.", max_found);
+ error= 1;
+ goto end;
+ }
+
+ next= max_found + 1;
+ if (sprintf(ext_buf, "%06lu", next)<0)
+ {
+ error= 1;
+ goto end;
+ }
*end++='.';
- DBUG_RETURN((sprintf(end,"%06ld",max_found+1) < 0));
+
+ /*
+ Check if the generated extension size + the file name exceeds the
+ buffer size used. If one did not check this, then the filename might be
+ truncated, resulting in error.
+ */
+ if (((strlen(ext_buf) + (end - name)) >= FN_REFLEN))
+ {
+ sql_print_error("Log filename too large: %s%s (%zu). \
+Please fix this by archiving old logs and updating the \
+index files.", name, ext_buf, (strlen(ext_buf) + (end - name)));
+ error= 1;
+ goto end;
+ }
+
+ if (sprintf(end, "%06lu", next)<0)
+ {
+ error= 1;
+ goto end;
+ }
+
+ /* print warning if reaching the end of available extensions. */
+ if ((next > (MAX_LOG_UNIQUE_FN_EXT - LOG_WARN_UNIQUE_FN_EXT_LEFT)))
+ sql_print_warning("Next log extension: %lu. \
+Remaining log filename extensions: %lu. \
+Please consider archiving some logs.", next, (MAX_LOG_UNIQUE_FN_EXT - next));
+
+end:
+ DBUG_RETURN(error);
}
@@ -2024,7 +2353,11 @@ bool MYSQL_LOG::init_and_set_log_file_name(const char *log_name,
1 error
*/
-bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
+bool MYSQL_LOG::open(
+#ifdef HAVE_PSI_INTERFACE
+ PSI_file_key log_file_key,
+#endif
+ const char *log_name, enum_log_type log_type_arg,
const char *new_name, enum cache_type io_cache_type_arg)
{
char buff[FN_REFLEN];
@@ -2052,10 +2385,16 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
db[0]= 0;
- if ((file= my_open(log_file_name, open_flags,
- MYF(MY_WME | ME_WAITTANG))) < 0 ||
+#ifdef HAVE_PSI_INTERFACE
+ /* Keep the key for reopen */
+ m_log_file_key= log_file_key;
+#endif
+
+ if ((file= mysql_file_open(log_file_key,
+ log_file_name, open_flags,
+ MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
- my_tell(file, MYF(MY_WME)), 0,
+ mysql_file_tell(file, MYF(MY_WME)), 0,
MYF(MY_WME | MY_NABP |
((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0))))
goto err;
@@ -2067,7 +2406,7 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
#ifdef EMBEDDED_LIBRARY
"embedded library\n",
my_progname, server_version, MYSQL_COMPILATION_COMMENT
-#elif __NT__
+#elif _WIN32
"started with:\nTCP Port: %d, Named Pipe: %s\n",
my_progname, server_version, MYSQL_COMPILATION_COMMENT,
mysqld_port, mysqld_unix_port
@@ -2093,9 +2432,10 @@ Turning logging off for the whole duration of the MySQL server process. \
To turn it on again: fix the cause, \
shutdown the MySQL server and restart it.", name, errno);
if (file >= 0)
- my_close(file, MYF(0));
+ mysql_file_close(file, MYF(0));
end_io_cache(&log_file);
- safeFree(name);
+ my_free(name);
+ name= NULL;
log_state= LOG_CLOSED;
DBUG_RETURN(1);
}
@@ -2117,7 +2457,7 @@ void MYSQL_LOG::init_pthread_objects()
{
DBUG_ASSERT(inited == 0);
inited= 1;
- (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW);
+ mysql_mutex_init(key_LOG_LOCK_log, &LOCK_log, MY_MUTEX_INIT_SLOW);
}
/*
@@ -2142,13 +2482,13 @@ void MYSQL_LOG::close(uint exiting)
{
end_io_cache(&log_file);
- if (my_sync(log_file.file, MYF(MY_WME)) && ! write_error)
+ if (mysql_file_sync(log_file.file, MYF(MY_WME)) && ! write_error)
{
write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
}
- if (my_close(log_file.file, MYF(MY_WME)) && ! write_error)
+ if (mysql_file_close(log_file.file, MYF(MY_WME)) && ! write_error)
{
write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
@@ -2156,7 +2496,8 @@ void MYSQL_LOG::close(uint exiting)
}
log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
- safeFree(name);
+ my_free(name);
+ name= NULL;
DBUG_VOID_RETURN;
}
@@ -2168,7 +2509,7 @@ void MYSQL_LOG::cleanup()
if (inited)
{
inited= 0;
- (void) pthread_mutex_destroy(&LOCK_log);
+ mysql_mutex_destroy(&LOCK_log);
close(0);
}
DBUG_VOID_RETURN;
@@ -2218,7 +2559,7 @@ void MYSQL_QUERY_LOG::reopen_file()
DBUG_VOID_RETURN;
}
- pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
save_name= name;
name= 0; // Don't free name
@@ -2228,10 +2569,14 @@ void MYSQL_QUERY_LOG::reopen_file()
Note that at this point, log_state != LOG_CLOSED (important for is_open()).
*/
- open(save_name, log_type, 0, io_cache_type);
- my_free(save_name, MYF(0));
+ open(
+#ifdef HAVE_PSI_INTERFACE
+ m_log_file_key,
+#endif
+ save_name, log_type, 0, io_cache_type);
+ my_free(save_name);
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
DBUG_VOID_RETURN;
}
@@ -2273,7 +2618,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
struct tm start;
uint time_buff_len= 0;
- (void) pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
/* Test if someone closed between the is_open test and lock */
if (is_open())
@@ -2322,7 +2667,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
goto err;
}
- (void) pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
return FALSE;
err:
@@ -2331,7 +2676,7 @@ err:
write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
}
- (void) pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
return TRUE;
}
@@ -2373,11 +2718,11 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
bool error= 0;
DBUG_ENTER("MYSQL_QUERY_LOG::write");
- (void) pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
if (!is_open())
{
- (void) pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(0);
}
@@ -2506,7 +2851,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
}
}
}
- (void) pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -2539,12 +2884,13 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
-MYSQL_BIN_LOG::MYSQL_BIN_LOG()
+MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
- is_relay_log(0),
+ sync_period_ptr(sync_period),
+ is_relay_log(0), signal_cnt(0),
checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
description_event_for_exec(0), description_event_for_queue(0)
@@ -2571,9 +2917,9 @@ void MYSQL_BIN_LOG::cleanup()
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
delete description_event_for_queue;
delete description_event_for_exec;
- (void) pthread_mutex_destroy(&LOCK_log);
- (void) pthread_mutex_destroy(&LOCK_index);
- (void) pthread_cond_destroy(&update_cond);
+ mysql_mutex_destroy(&LOCK_log);
+ mysql_mutex_destroy(&LOCK_index);
+ mysql_cond_destroy(&update_cond);
}
DBUG_VOID_RETURN;
}
@@ -2592,17 +2938,11 @@ void MYSQL_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg)
void MYSQL_BIN_LOG::init_pthread_objects()
{
- DBUG_ASSERT(inited == 0);
- inited= 1;
- (void) pthread_mutex_init(&LOCK_log, MY_MUTEX_INIT_SLOW);
- /*
- LOCK_index and LOCK_log are taken in wrong order
- Can be seen with 'mysql-test-run ndb.ndb_binlog_basic'
- */
- (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index",
- MYF_NO_DEADLOCK_DETECTION);
- (void) pthread_cond_init(&update_cond, 0);
- (void) pthread_cond_init(&COND_queue_busy, 0);
+ MYSQL_LOG::init_pthread_objects();
+ mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
+ mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION);
+ mysql_cond_init(m_key_update_cond, &update_cond, 0);
+ mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0);
}
@@ -2625,23 +2965,25 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
}
fn_format(index_file_name, index_file_name_arg, mysql_data_home,
".index", opt);
- if ((index_file_nr= my_open(index_file_name,
- O_RDWR | O_CREAT | O_BINARY ,
- MYF(MY_WME))) < 0 ||
- my_sync(index_file_nr, MYF(MY_WME)) ||
+ if ((index_file_nr= mysql_file_open(m_key_file_log_index,
+ index_file_name,
+ O_RDWR | O_CREAT | O_BINARY,
+ MYF(MY_WME))) < 0 ||
+ mysql_file_sync(index_file_nr, MYF(MY_WME)) ||
init_io_cache(&index_file, index_file_nr,
IO_SIZE, WRITE_CACHE,
- my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
- 0, MYF(MY_WME | MY_WAIT_IF_FULL)) ||
+ mysql_file_seek(index_file_nr, 0L, MY_SEEK_END, MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)) ||
DBUG_EVALUATE_IF("fault_injection_openning_index", 1, 0))
{
/*
TODO: all operations creating/deleting the index file or a log, should
call my_sync_dir() or my_sync_dir_by_file() to be durable.
- TODO: file creation should be done with my_create() not my_open().
+ TODO: file creation should be done with mysql_file_create()
+ not mysql_file_open().
*/
if (index_file_nr >= 0)
- my_close(index_file_nr,MYF(0));
+ mysql_file_close(index_file_nr, MYF(0));
return TRUE;
}
@@ -2737,8 +3079,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
write_error= 0;
/* open the main log file */
- if (MYSQL_LOG::open(log_name, log_type_arg, new_name,
- io_cache_type_arg))
+ if (MYSQL_LOG::open(
+#ifdef HAVE_PSI_INTERFACE
+ m_key_file_log,
+#endif
+ log_name, log_type_arg, new_name, io_cache_type_arg))
{
#ifdef HAVE_REPLICATION
close_purge_index_file();
@@ -2798,7 +3143,6 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
- s.pre_55_writing_direct();
if (s.write(&log_file))
goto err;
bytes_written+= s.data_written;
@@ -2830,19 +3174,18 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Don't set log_pos in event header */
description_event_for_queue->set_artificial_event();
- description_event_for_queue->pre_55_writing_direct();
if (description_event_for_queue->write(&log_file))
goto err;
bytes_written+= description_event_for_queue->data_written;
}
if (flush_io_cache(&log_file) ||
- my_sync(log_file.file, MYF(MY_WME)))
+ mysql_file_sync(log_file.file, MYF(MY_WME)))
goto err;
- pthread_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_commit_ordered);
strmake(last_commit_pos_file, log_file_name,
sizeof(last_commit_pos_file)-1);
last_commit_pos_offset= my_b_tell(&log_file);
- pthread_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
if (write_file_name_to_index_file)
{
@@ -2862,7 +3205,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
strlen(log_file_name)) ||
my_b_write(&index_file, (uchar*) "\n", 1) ||
flush_io_cache(&index_file) ||
- my_sync(index_file.file, MYF(MY_WME)))
+ mysql_file_sync(index_file.file, MYF(MY_WME)))
goto err;
#ifdef HAVE_REPLICATION
@@ -2889,20 +3232,17 @@ Turning logging off for the whole duration of the MySQL server process. \
To turn it on again: fix the cause, \
shutdown the MySQL server and restart it.", name, errno);
if (file >= 0)
- my_close(file,MYF(0));
- end_io_cache(&log_file);
- end_io_cache(&index_file);
- safeFree(name);
- log_state= LOG_CLOSED;
+ mysql_file_close(file, MYF(0));
+ close(LOG_CLOSE_INDEX);
DBUG_RETURN(1);
}
int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo)
{
- pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
int ret = raw_get_current_log(linfo);
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
return ret;
}
@@ -2942,19 +3282,20 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
for (;; offset+= bytes_read)
{
- (void) my_seek(file, offset, MY_SEEK_SET, MYF(0));
- if ((bytes_read= (int) my_read(file, io_buf, sizeof(io_buf), MYF(MY_WME)))
+ mysql_file_seek(file, offset, MY_SEEK_SET, MYF(0));
+ if ((bytes_read= (int) mysql_file_read(file, io_buf, sizeof(io_buf),
+ MYF(MY_WME)))
< 0)
goto err;
if (!bytes_read)
break; // end of file
- (void) my_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0));
- if (my_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
+ mysql_file_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0));
+ if (mysql_file_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
goto err;
}
/* The following will either truncate the file or fill the end with \n' */
- if (my_chsize(file, offset - init_offset, '\n', MYF(MY_WME)) ||
- my_sync(file, MYF(MY_WME)))
+ if (mysql_file_chsize(file, offset - init_offset, '\n', MYF(MY_WME)) ||
+ mysql_file_sync(file, MYF(MY_WME)))
goto err;
/* Reset data in old index cache */
@@ -2993,18 +3334,33 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
bool need_lock)
{
int error= 0;
- char *fname= linfo->log_file_name;
- uint log_name_len= log_name ? (uint) strlen(log_name) : 0;
+ char *full_fname= linfo->log_file_name;
+ char full_log_name[FN_REFLEN], fname[FN_REFLEN];
+ uint log_name_len= 0, fname_len= 0;
DBUG_ENTER("find_log_pos");
- DBUG_PRINT("enter",("log_name: %s", log_name ? log_name : "NULL"));
+ full_log_name[0]= full_fname[0]= 0;
/*
Mutex needed because we need to make sure the file pointer does not
move from under our feet
*/
if (need_lock)
- pthread_mutex_lock(&LOCK_index);
- safe_mutex_assert_owner(&LOCK_index);
+ mysql_mutex_lock(&LOCK_index);
+ mysql_mutex_assert_owner(&LOCK_index);
+
+ // extend relative paths for log_name to be searched
+ if (log_name)
+ {
+ if(normalize_binlog_name(full_log_name, log_name, is_relay_log))
+ {
+ error= LOG_INFO_EOF;
+ goto end;
+ }
+ }
+
+ log_name_len= log_name ? (uint) strlen(full_log_name) : 0;
+ DBUG_PRINT("enter", ("log_name: %s, full_log_name: %s",
+ log_name ? log_name : "NULL", full_log_name));
/* As the file is flushed, we can't get an error here */
(void) reinit_io_cache(&index_file, READ_CACHE, (my_off_t) 0, 0, 0);
@@ -3013,8 +3369,10 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
{
uint length;
my_off_t offset= my_b_tell(&index_file);
- /* If we get 0 or 1 characters, this is the end of the file */
+ DBUG_EXECUTE_IF("simulate_find_log_pos_error",
+ error= LOG_INFO_EOF; break;);
+ /* If we get 0 or 1 characters, this is the end of the file */
if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= 1)
{
/* Did not find the given entry; Return not found or error */
@@ -3022,21 +3380,30 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
break;
}
+ // extend relative paths and match against full path
+ if (normalize_binlog_name(full_fname, fname, is_relay_log))
+ {
+ error= LOG_INFO_EOF;
+ break;
+ }
+ fname_len= (uint) strlen(full_fname);
+
// if the log entry matches, null string matching anything
if (!log_name ||
- (log_name_len == length-1 && fname[log_name_len] == '\n' &&
- !memcmp(fname, log_name, log_name_len)))
+ (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' &&
+ !memcmp(full_fname, full_log_name, log_name_len)))
{
- DBUG_PRINT("info",("Found log file entry"));
- fname[length-1]=0; // remove last \n
+ DBUG_PRINT("info", ("Found log file entry"));
+ full_fname[fname_len-1]= 0; // remove last \n
linfo->index_file_start_offset= offset;
linfo->index_file_offset = my_b_tell(&index_file);
break;
}
}
+end:
if (need_lock)
- pthread_mutex_unlock(&LOCK_index);
+ mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
@@ -3069,11 +3436,12 @@ int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
{
int error= 0;
uint length;
- char *fname= linfo->log_file_name;
+ char fname[FN_REFLEN];
+ char *full_fname= linfo->log_file_name;
if (need_lock)
- pthread_mutex_lock(&LOCK_index);
- safe_mutex_assert_owner(&LOCK_index);
+ mysql_mutex_lock(&LOCK_index);
+ mysql_mutex_assert_owner(&LOCK_index);
/* As the file is flushed, we can't get an error here */
(void) reinit_io_cache(&index_file, READ_CACHE, linfo->index_file_offset, 0,
@@ -3085,12 +3453,23 @@ int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
goto err;
}
- fname[length-1]=0; // kill \n
- linfo->index_file_offset = my_b_tell(&index_file);
+
+ if (fname[0] != 0)
+ {
+ if(normalize_binlog_name(full_fname, fname, is_relay_log))
+ {
+ error= LOG_INFO_EOF;
+ goto err;
+ }
+ length= strlen(full_fname);
+ }
+
+ full_fname[length-1]= 0; // kill \n
+ linfo->index_file_offset= my_b_tell(&index_file);
err:
if (need_lock)
- pthread_mutex_unlock(&LOCK_index);
+ mysql_mutex_unlock(&LOCK_index);
return error;
}
@@ -3116,6 +3495,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
{
LOG_INFO linfo;
bool error=0;
+ int err;
const char* save_name;
DBUG_ENTER("reset_logs");
@@ -3124,8 +3504,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
We need to get both locks to be sure that no one is trying to
write to the index log file.
*/
- pthread_mutex_lock(&LOCK_log);
- pthread_mutex_lock(&LOCK_index);
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_index);
/*
The following mutex is needed to ensure that no threads call
@@ -3133,7 +3513,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
thread. If the transaction involved MyISAM tables, it should go
into binlog even on rollback.
*/
- pthread_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_thread_count);
/* Save variables so that we can reopen the log */
save_name=name;
@@ -3149,9 +3529,13 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
We need to invert the steps and use the purge_index_file methods
in order to make the operation safe.
*/
- if (find_log_pos(&linfo, NullS, 0))
+
+ if ((err= find_log_pos(&linfo, NullS, 0)) != 0)
{
- error=1;
+ uint errcode= purge_log_get_error_code(err);
+ sql_print_error("Failed to locate old binlog or relay log files");
+ my_message(errcode, ER(errcode), MYF(0));
+ error= 1;
goto err;
}
@@ -3218,12 +3602,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
if (!open_index_file(index_file_name, 0, FALSE))
if ((error= open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0, FALSE)))
goto err;
- my_free((uchar*) save_name, MYF(0));
+ my_free((void *) save_name);
err:
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- pthread_mutex_unlock(&LOCK_index);
- pthread_mutex_unlock(&LOCK_log);
+ if (error == 1)
+ name= const_cast<char*>(save_name);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_index);
+ mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -3277,7 +3663,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(rli->slave_running == 1);
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
- pthread_mutex_lock(&LOCK_index);
+ mysql_mutex_lock(&LOCK_index);
to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
/*
@@ -3321,19 +3707,19 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE(););
- pthread_mutex_lock(&rli->log_space_lock);
+ mysql_mutex_lock(&rli->log_space_lock);
rli->relay_log.purge_logs(to_purge_if_included, included,
0, 0, &rli->log_space_total);
// Tell the I/O thread to take the relay_log_space_limit into account
rli->ignore_log_space_limit= 0;
- pthread_mutex_unlock(&rli->log_space_lock);
+ mysql_mutex_unlock(&rli->log_space_lock);
/*
Ok to broadcast after the critical region as there is no risk of
the mutex being destroyed by this thread later - this helps save
context switches
*/
- pthread_cond_broadcast(&rli->log_space_cond);
+ mysql_cond_broadcast(&rli->log_space_cond);
/*
* Need to update the log pos because purge logs has been called
@@ -3354,8 +3740,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
DBUG_ASSERT(!included || rli->linfo.index_file_start_offset == 0);
err:
- my_free(to_purge_if_included, MYF(0));
- pthread_mutex_unlock(&LOCK_index);
+ my_free(to_purge_if_included);
+ mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
@@ -3395,7 +3781,7 @@ int MYSQL_BIN_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads
LOG_INFO_EOF to_log not found
LOG_INFO_EMFILE too many files opened
LOG_INFO_FATAL if any other than ENOENT error from
- my_stat() or my_delete()
+ mysql_file_stat() or mysql_file_delete()
*/
int MYSQL_BIN_LOG::purge_logs(const char *to_log,
@@ -3412,7 +3798,7 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log,
DBUG_PRINT("info",("to_log= %s",to_log));
if (need_mutex)
- pthread_mutex_lock(&LOCK_index);
+ mysql_mutex_lock(&LOCK_index);
if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/)))
{
sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not "
@@ -3475,7 +3861,7 @@ err:
DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICIDE(););
if (need_mutex)
- pthread_mutex_unlock(&LOCK_index);
+ mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
@@ -3539,8 +3925,7 @@ int MYSQL_BIN_LOG::close_purge_index_file()
bool MYSQL_BIN_LOG::is_inited_purge_index_file()
{
- DBUG_ENTER("MYSQL_BIN_LOG::is_inited_purge_index_file");
- DBUG_RETURN (my_b_inited(&purge_index_file));
+ return my_b_inited(&purge_index_file);
}
int MYSQL_BIN_LOG::sync_purge_index_file()
@@ -3576,13 +3961,12 @@ int MYSQL_BIN_LOG::register_create_index_entry(const char *entry)
int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
bool need_mutex)
{
+ DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry");
MY_STAT s;
int error= 0;
LOG_INFO log_info;
LOG_INFO check_log_info;
- DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry");
-
DBUG_ASSERT(my_b_inited(&purge_index_file));
if ((error=reinit_io_cache(&purge_index_file, READ_CACHE, 0, 0, 0)))
@@ -3614,7 +3998,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
/* Get rid of the trailing '\n' */
log_info.log_file_name[length-1]= 0;
- if (!my_stat(log_info.log_file_name, &s, MYF(0)))
+ if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &s, MYF(0)))
{
if (my_errno == ENOENT)
{
@@ -3628,7 +4012,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
- sql_print_information("Failed to execute my_stat on file '%s'",
+ sql_print_information("Failed to execute mysql_file_stat on file '%s'",
log_info.log_file_name);
my_errno= 0;
}
@@ -3766,7 +4150,7 @@ err:
@retval
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated
LOG_INFO_FATAL if any other than ENOENT error from
- my_stat() or my_delete()
+ mysql_file_stat() or mysql_file_delete()
*/
int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
@@ -3779,7 +4163,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
DBUG_ENTER("purge_logs_before_date");
- pthread_mutex_lock(&LOCK_index);
+ mysql_mutex_lock(&LOCK_index);
to_log[0]= 0;
if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/)))
@@ -3789,7 +4173,8 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
!is_active(log_info.log_file_name) &&
!log_in_use(log_info.log_file_name))
{
- if (!my_stat(log_info.log_file_name, &stat_area, MYF(0)))
+ if (!mysql_file_stat(m_key_file_log,
+ log_info.log_file_name, &stat_area, MYF(0)))
{
if (my_errno == ENOENT)
{
@@ -3838,7 +4223,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
error= (to_log[0] ? purge_logs(to_log, 1, 0, 1, (ulonglong *) 0) : 0);
err:
- pthread_mutex_unlock(&LOCK_index);
+ mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
#endif /* HAVE_REPLICATION */
@@ -3924,11 +4309,11 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
}
if (need_lock)
- pthread_mutex_lock(&LOCK_log);
- pthread_mutex_lock(&LOCK_index);
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_index);
- safe_mutex_assert_owner(&LOCK_log);
- safe_mutex_assert_owner(&LOCK_index);
+ mysql_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_index);
/*
if binlog is used as tc log, be sure all xids are "unlogged",
@@ -3942,12 +4327,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
if (prepared_xids)
{
tc_log_page_waits++;
- pthread_mutex_lock(&LOCK_prep_xids);
+ mysql_mutex_lock(&LOCK_prep_xids);
while (prepared_xids) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
- pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
+ mysql_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
}
- pthread_mutex_unlock(&LOCK_prep_xids);
+ mysql_mutex_unlock(&LOCK_prep_xids);
}
/* Reuse old name if not binlog and not update log */
@@ -3978,7 +4363,6 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
*/
if (is_relay_log)
r.checksum_alg= relay_log_checksum_alg;
- r.pre_55_writing_direct();
DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
(error= r.write(&log_file)))
@@ -4038,7 +4422,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
close_on_error= TRUE;
}
- my_free(old_name,MYF(0));
+ my_free(old_name);
end:
@@ -4066,8 +4450,8 @@ end:
}
if (need_lock)
- pthread_mutex_unlock(&LOCK_log);
- pthread_mutex_unlock(&LOCK_index);
+ mysql_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
@@ -4076,7 +4460,7 @@ end:
bool MYSQL_BIN_LOG::append(Log_event* ev)
{
bool error = 0;
- pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
DBUG_ENTER("MYSQL_BIN_LOG::append");
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
@@ -4084,7 +4468,6 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
Log_event::write() is smart enough to use my_b_write() or
my_b_append() depending on the kind of cache we have.
*/
- ev->pre_55_writing_direct();
if (ev->write(&log_file))
{
error=1;
@@ -4092,10 +4475,12 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
}
bytes_written+= ev->data_written;
DBUG_PRINT("info",("max_size: %lu",max_size));
+ if (flush_and_sync(0))
+ goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
signal_update(); // Safe as we don't call close
DBUG_RETURN(error);
}
@@ -4110,7 +4495,7 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
- safe_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_log);
do
{
if (my_b_append(&log_file,(uchar*) buf,len))
@@ -4121,6 +4506,8 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
bytes_written += len;
} while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
DBUG_PRINT("info",("max_size: %lu",max_size));
+ if (flush_and_sync(0))
+ goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
@@ -4129,17 +4516,21 @@ err:
DBUG_RETURN(error);
}
-
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
int err=0, fd=log_file.file;
- safe_mutex_assert_owner(&LOCK_log);
+ if (synced)
+ *synced= 0;
+ mysql_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
- if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period)
+ uint sync_period= get_sync_period();
+ if (sync_period && ++sync_counter >= sync_period)
{
- sync_binlog_counter= 0;
- err=my_sync(fd, MYF(MY_WME));
+ sync_counter= 0;
+ err= mysql_file_sync(fd, MYF(MY_WME));
+ if (synced)
+ *synced= 1;
#ifndef DBUG_OFF
if (opt_binlog_dbug_fsync_sleep > 0)
my_sleep(opt_binlog_dbug_fsync_sleep);
@@ -4169,6 +4560,72 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
query_id_param >= thd->binlog_evt_union.first_query_id);
}
+/**
+ This function checks if a transactional table was updated by the
+ current transaction.
+
+ @param thd The client thread that executed the current statement.
+ @return
+ @c true if a transactional table was updated, @c false otherwise.
+*/
+bool
+trans_has_updated_trans_table(const THD* thd)
+{
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ return (cache_mngr ? !cache_mngr->trx_cache.empty() : 0);
+}
+
+/**
+ This function checks if a transactional table was updated by the
+ current statement.
+
+ @param thd The client thread that executed the current statement.
+ @return
+ @c true if a transactional table was updated, @c false otherwise.
+*/
+bool
+stmt_has_updated_trans_table(const THD *thd)
+{
+ Ha_trx_info *ha_info;
+
+ for (ha_info= thd->transaction.stmt.ha_list; ha_info;
+ ha_info= ha_info->next())
+ {
+ if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
+ return (TRUE);
+ }
+ return (FALSE);
+}
+
+/**
+ This function checks if either a trx-cache or a non-trx-cache should
+ be used. If @c bin_log_direct_non_trans_update is active or the format
+ is either MIXED or ROW, the cache to be used depends on the flag @c
+ is_transactional.
+
+ On the other hand, if binlog_format is STMT or direct option is
+ OFF, the trx-cache should be used if and only if the statement is
+ transactional or the trx-cache is not empty. Otherwise, the
+ non-trx-cache should be used.
+
+ @param thd The client thread.
+ @param is_transactional The changes are related to a trx-table.
+ @return
+ @c true if a trx-cache should be used, @c false otherwise.
+*/
+bool use_trans_cache(const THD* thd, bool is_transactional)
+{
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ return
+ ((thd->is_current_stmt_binlog_format_row() ||
+ thd->variables.binlog_direct_non_trans_update) ? is_transactional :
+ (is_transactional || !cache_mngr->trx_cache.empty()));
+}
+
/**
This function checks if a transaction, either a multi-statement
or a single statement transaction is about to commit or not.
@@ -4179,43 +4636,40 @@ bool MYSQL_BIN_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
@return
@c true if committing a transaction, otherwise @c false.
*/
-bool ending_trans(const THD* thd, const bool all)
+bool ending_trans(THD* thd, const bool all)
{
- return (all || (!all && !(thd->options &
- (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))));
+ return (all || ending_single_stmt_trans(thd, all));
}
/**
- This function checks if a non-transactional table was updated by
- the current transaction.
+ This function checks if a single statement transaction is about
+ to commit or not.
@param thd The client thread that executed the current statement.
+ @param all Committing a transaction (i.e. TRUE) or a statement
+ (i.e. FALSE).
@return
- @c true if a non-transactional table was updated, @c false
- otherwise.
+ @c true if committing a single statement transaction, otherwise
+ @c false.
*/
-bool trans_has_updated_non_trans_table(const THD* thd)
+bool ending_single_stmt_trans(THD* thd, const bool all)
{
- return (thd->transaction.all.modified_non_trans_table ||
- thd->transaction.stmt.modified_non_trans_table);
+ return (!all && !thd->in_multi_stmt_transaction_mode());
}
/**
- This function checks if any statement was committed and cached.
+ This function checks if a non-transactional table was updated by
+ the current transaction.
@param thd The client thread that executed the current statement.
- @param all Committing a transaction (i.e. TRUE) or a statement
- (i.e. FALSE).
@return
- @c true if at a statement was committed and cached, @c false
+ @c true if a non-transactional table was updated, @c false
otherwise.
*/
-bool trans_has_no_stmt_committed(const THD* thd, bool all)
+bool trans_has_updated_non_trans_table(const THD* thd)
{
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
-
- return (!all && !trx_data->at_least_one_stmt_committed);
+ return (thd->transaction.all.modified_non_trans_table ||
+ thd->transaction.stmt.modified_non_trans_table);
}
/**
@@ -4239,24 +4693,31 @@ bool stmt_has_updated_non_trans_table(const THD* thd)
int THD::binlog_setup_trx_data()
{
DBUG_ENTER("THD::binlog_setup_trx_data");
- binlog_trx_data *trx_data=
- (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
+ binlog_cache_mngr *cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
- if (trx_data)
+ if (cache_mngr)
DBUG_RETURN(0); // Already set up
- trx_data= (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL));
- if (!trx_data ||
- open_cached_file(&trx_data->trans_log, mysql_tmpdir,
+ cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL));
+ if (!cache_mngr ||
+ open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
+ LOG_PREFIX, binlog_stmt_cache_size, MYF(MY_WME)) ||
+ open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
{
- my_free((uchar*)trx_data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free(cache_mngr);
DBUG_RETURN(1); // Didn't manage to set it up
}
- thd_set_ha_data(this, binlog_hton, trx_data);
-
- trx_data= new (thd_get_ha_data(this, binlog_hton)) binlog_trx_data;
+ thd_set_ha_data(this, binlog_hton, cache_mngr);
+ cache_mngr= new (thd_get_ha_data(this, binlog_hton))
+ binlog_cache_mngr(max_binlog_stmt_cache_size,
+ max_binlog_cache_size,
+ &binlog_stmt_cache_use,
+ &binlog_stmt_cache_disk_use,
+ &binlog_cache_use,
+ &binlog_cache_disk_use);
DBUG_RETURN(0);
}
@@ -4273,11 +4734,10 @@ int THD::binlog_setup_trx_data()
- Start a transaction if not in autocommit mode or if a BEGIN
statement has been seen.
- - Start a statement transaction to allow us to truncate the binary
- log.
+ - Start a statement transaction to allow us to truncate the cache.
- Save the currrent binlog position so that we can roll back the
- statement by truncating the transaction log.
+ statement by truncating the cache.
We only update the saved position if the old one was undefined,
the reason is that there are some cases (e.g., for CREATE-SELECT)
@@ -4291,18 +4751,18 @@ int THD::binlog_setup_trx_data()
void
THD::binlog_start_trans_and_stmt()
{
- binlog_trx_data *trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
+ binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
DBUG_ENTER("binlog_start_trans_and_stmt");
- DBUG_PRINT("enter", ("trx_data: 0x%lx trx_data->before_stmt_pos: %lu",
- (long) trx_data,
- (trx_data ? (ulong) trx_data->before_stmt_pos :
+ DBUG_PRINT("enter", ("cache_mngr: %p cache_mngr->trx_cache.get_prev_position(): %lu",
+ cache_mngr,
+ (cache_mngr ? (ulong) cache_mngr->trx_cache.get_prev_position() :
(ulong) 0)));
- if (trx_data == NULL ||
- trx_data->before_stmt_pos == MY_OFF_T_UNDEF)
+ if (cache_mngr == NULL ||
+ cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF)
{
this->binlog_set_stmt_begin();
- if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
+ if (in_multi_stmt_transaction_mode())
trans_register_ha(this, TRUE, binlog_hton);
trans_register_ha(this, FALSE, binlog_hton);
/*
@@ -4320,47 +4780,57 @@ THD::binlog_start_trans_and_stmt()
}
void THD::binlog_set_stmt_begin() {
- binlog_trx_data *trx_data=
- (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
+ binlog_cache_mngr *cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
/*
- The call to binlog_trans_log_savepos() might create the trx_data
+ The call to binlog_trans_log_savepos() might create the cache_mngr
structure, if it didn't exist before, so we save the position
into an auto variable and then write it into the transaction
- data for the binary log (i.e., trx_data).
+ data for the binary log (i.e., cache_mngr).
*/
my_off_t pos= 0;
binlog_trans_log_savepos(this, &pos);
- trx_data= (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
- trx_data->before_stmt_pos= pos;
+ cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
+ cache_mngr->trx_cache.set_prev_position(pos);
}
static int
binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
{
int err= 0;
- binlog_trx_data *trx_data;
DBUG_ENTER("binlog_start_consistent_snapshot");
thd->binlog_setup_trx_data();
- trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
/* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
- strmake(trx_data->last_commit_pos_file, mysql_bin_log.last_commit_pos_file,
- sizeof(trx_data->last_commit_pos_file)-1);
- trx_data->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
+ strmake(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file,
+ sizeof(cache_mngr->last_commit_pos_file)-1);
+ cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
trans_register_ha(thd, TRUE, hton);
DBUG_RETURN(err);
}
-/*
- Write a table map to the binary log. If with_annotate != NULL and
- *with_annotate = TRUE write also Annotate_rows before the table map.
- */
+/**
+ This function writes a table map to the binary log.
+ Note that in order to keep the signature uniform with related methods,
+ we use a redundant parameter to indicate whether a transactional table
+ was changed or not.
-int THD::binlog_write_table_map(TABLE *table, bool is_trans,
+ If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the table map.
+
+ @param table a pointer to the table.
+ @param is_transactional @c true indicates a transactional table,
+ otherwise @c false a non-transactional.
+ @return
+ nonzero if an error pops up when writing the table map event.
+*/
+int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
my_bool *with_annotate)
{
int error;
@@ -4370,145 +4840,177 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans,
table->s->table_map_id));
/* Pre-conditions */
- DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open());
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
- the_event(this, table, table->s->table_map_id, is_trans);
+ the_event(this, table, table->s->table_map_id, is_transactional);
- if (is_trans && binlog_table_maps == 0)
+ if (binlog_table_maps == 0)
binlog_start_trans_and_stmt();
- if ((error= mysql_bin_log.write(&the_event, with_annotate)))
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
+
+ IO_CACHE *file=
+ cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional));
+ if (with_annotate && *with_annotate)
+ {
+ Annotate_rows_log_event anno(current_thd, is_transactional, false);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if ((error= anno.write(file)))
+ DBUG_RETURN(error);
+ }
+ if ((error= the_event.write(file)))
DBUG_RETURN(error);
binlog_table_maps++;
DBUG_RETURN(0);
}
+/**
+ This function retrieves a pending row event from a cache which is
+ specified through the parameter @c is_transactional. Respectively, when it
+ is @c true, the pending event is returned from the transactional cache.
+ Otherwise from the non-transactional cache.
+
+ @param is_transactional @c true indicates a transactional cache,
+ otherwise @c false a non-transactional.
+ @return
+ The row event if any.
+*/
Rows_log_event*
-THD::binlog_get_pending_rows_event() const
+THD::binlog_get_pending_rows_event(bool is_transactional) const
{
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
+ Rows_log_event* rows= NULL;
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
+
/*
- This is less than ideal, but here's the story: If there is no
- trx_data, prepare_pending_rows_event() has never been called
- (since the trx_data is set up there). In that case, we just return
- NULL.
+ This is less than ideal, but here's the story: If there is no cache_mngr,
+ prepare_pending_rows_event() has never been called (since the cache_mngr
+ is set up there). In that case, we just return NULL.
*/
- return trx_data ? trx_data->pending() : NULL;
+ if (cache_mngr)
+ {
+ binlog_cache_data *cache_data=
+ cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
+
+ rows= cache_data->pending();
+ }
+ return (rows);
}
+/**
+ This function stores a pending row event into a cache which is specified
+ through the parameter @c is_transactional. Respectively, when it is @c
+ true, the pending event is stored into the transactional cache. Otherwise
+ into the non-transactional cache.
+
+ @param evt a pointer to the row event.
+ @param is_transactional @c true indicates a transactional cache,
+ otherwise @c false a non-transactional.
+*/
void
-THD::binlog_set_pending_rows_event(Rows_log_event* ev)
+THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool is_transactional)
{
if (thd_get_ha_data(this, binlog_hton) == NULL)
binlog_setup_trx_data();
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(this, binlog_hton);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
+
+ DBUG_ASSERT(cache_mngr);
+
+ binlog_cache_data *cache_data=
+ cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
- DBUG_ASSERT(trx_data);
- trx_data->set_pending(ev);
+ cache_data->set_pending(ev);
}
/**
- Remove the pending rows event, discarding any outstanding rows.
-
- If there is no pending rows event available, this is effectively a
+ This function removes the pending rows event, discarding any outstanding
+ rows. If there is no pending rows event available, this is effectively a
no-op.
- */
+
+ @param thd a pointer to the user thread.
+ @param is_transactional @c true indicates a transactional cache,
+ otherwise @c false a non-transactional.
+*/
int
-MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd)
+MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event");
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ DBUG_ASSERT(cache_mngr);
- DBUG_ASSERT(trx_data);
+ binlog_cache_data *cache_data=
+ cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional));
- if (Rows_log_event* pending= trx_data->pending())
+ if (Rows_log_event* pending= cache_data->pending())
{
delete pending;
- trx_data->set_pending(NULL);
+ cache_data->set_pending(NULL);
}
DBUG_RETURN(0);
}
/*
- Moves the last bunch of rows from the pending Rows event to the binlog
- (either cached binlog if transaction, or disk binlog). Sets a new pending
- event.
+ Moves the last bunch of rows from the pending Rows event to a cache (either
+ transactional cache if is_transaction is @c true, or the non-transactional
+ cache otherwise. Sets a new pending event.
+
+ @param thd a pointer to the user thread.
+ @param evt a pointer to the row event.
+ @param is_transactional @c true indicates a transactional cache,
+ otherwise @c false a non-transactional.
*/
int
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
- Rows_log_event* event)
+ Rows_log_event* event,
+ bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
DBUG_ASSERT(mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ DBUG_ASSERT(cache_mngr);
- DBUG_ASSERT(trx_data);
+ binlog_cache_data *cache_data=
+ cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional));
- DBUG_PRINT("info", ("trx_data->pending(): 0x%lx", (long) trx_data->pending()));
+ DBUG_PRINT("info", ("cache_mngr->pending(): 0x%lx", (long) cache_data->pending()));
- if (Rows_log_event* pending= trx_data->pending())
+ if (Rows_log_event* pending= cache_data->pending())
{
+ IO_CACHE *file= &cache_data->cache_log;
+
/*
- Decide if we should write to the log file directly or to the
- transaction log.
+ Write pending event to the cache.
*/
- if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log))
- {
- /* Write to transaction log/cache. */
- if (pending->write(&trx_data->trans_log))
- {
- set_write_error(thd);
- DBUG_RETURN(1);
- }
- }
- else
+ if (pending->write(file))
{
- /* Write directly to log file. */
- pthread_mutex_lock(&LOCK_log);
- pending->pre_55_writing_direct();
- if (pending->write(&log_file))
- {
- pthread_mutex_unlock(&LOCK_log);
- set_write_error(thd);
- DBUG_RETURN(1);
- }
-
- error= flush_and_sync();
- if (!error)
- {
- signal_update();
- error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
- }
-
- /*
- Take mutex to protect against a reader seeing partial writes of 64-bit
- offset on 32-bit CPUs.
- */
- pthread_mutex_lock(&LOCK_commit_ordered);
- last_commit_pos_offset= my_b_tell(&log_file);
- pthread_mutex_unlock(&LOCK_commit_ordered);
- pthread_mutex_unlock(&LOCK_log);
+ set_write_error(thd, is_transactional);
+ if (check_write_error(thd) && cache_data &&
+ stmt_has_updated_non_trans_table(thd))
+ cache_data->set_incident();
+ DBUG_RETURN(1);
}
delete pending;
}
- thd->binlog_set_pending_rows_event(event);
+ thd->binlog_set_pending_rows_event(event, is_transactional);
DBUG_RETURN(error);
}
@@ -4523,8 +5025,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
- uint16 cache_type;
DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
+ binlog_cache_data *cache_data= 0;
+ bool is_trans_cache= FALSE;
+ bool using_trans= event_info->use_trans_cache();
+ bool direct= event_info->use_direct_logging();
if (thd->binlog_evt_union.do_union)
{
@@ -4533,7 +5038,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
We will log the function call to the binary log on function exit
*/
thd->binlog_evt_union.unioned_events= TRUE;
- thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt;
+ thd->binlog_evt_union.unioned_events_trans |= using_trans;
DBUG_RETURN(0);
}
@@ -4543,8 +5048,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
this will close all tables on the slave.
*/
bool const end_stmt=
- thd->prelocked_mode && thd->lex->requires_prelocking();
- if (thd->binlog_flush_pending_rows_event(end_stmt))
+ thd->locked_tables_mode && thd->lex->requires_prelocking();
+ if (thd->binlog_flush_pending_rows_event(end_stmt, using_trans))
DBUG_RETURN(error);
/*
@@ -4554,8 +5059,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
*/
if (likely(is_open()))
{
- IO_CACHE *file= &log_file;
- my_off_t my_org_b_tell;
+ my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
/*
In the future we need to add to the following if tests like
@@ -4563,110 +5067,71 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
binlog_[wild_]{do|ignore}_table?" (WL#1049)"
*/
const char *local_db= event_info->get_db();
- if ((!(thd->options & OPTION_BIN_LOG)) ||
+ if ((!(thd->variables.option_bits & OPTION_BIN_LOG)) ||
(thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
thd->lex->sql_command != SQLCOM_SAVEPOINT &&
!binlog_filter->db_ok(local_db)))
- {
DBUG_RETURN(0);
- }
#endif /* HAVE_REPLICATION */
- my_org_b_tell= my_b_tell(file);
+ IO_CACHE *file= NULL;
-#if defined(USING_TRANSACTIONS)
- /*
- Should we write to the binlog cache or to the binlog on disk?
-
- Write to the binlog cache if:
- 1 - a transactional engine/table is updated (stmt_has_updated_trans_table == TRUE);
- 2 - or the event asks for it (cache_stmt == TRUE);
- 3 - or the cache is already not empty (meaning we're in a transaction;
- note that the present event could be about a non-transactional table, but
- still we need to write to the binlog cache in that case to handle updates
- to mixed trans/non-trans table types).
-
- Write to the binlog on disk if only a non-transactional engine is
- updated and:
- 1 - the binlog cache is empty or;
- 2 - --binlog-direct-non-transactional-updates is set and we are about to
- use the statement format. When using the row format (cache_stmt == TRUE).
- */
- if (opt_using_transactions)
+ if (direct)
+ {
+ file= &log_file;
+ my_org_b_tell= my_b_tell(file);
+ mysql_mutex_lock(&LOCK_log);
+ }
+ else
{
if (thd->binlog_setup_trx_data())
goto err;
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- IO_CACHE *trans_log= &trx_data->trans_log;
- my_off_t trans_log_pos= my_b_tell(trans_log);
- if (event_info->get_cache_stmt() || stmt_has_updated_trans_table(thd) ||
- (!thd->variables.binlog_direct_non_trans_update &&
- trans_log_pos != 0))
- {
- DBUG_PRINT("info", ("Using trans_log: cache: %d, trans_log_pos: %lu",
- event_info->get_cache_stmt(),
- (ulong) trans_log_pos));
- thd->binlog_start_trans_and_stmt();
- file= trans_log;
- }
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ is_trans_cache= use_trans_cache(thd, using_trans);
+ file= cache_mngr->get_binlog_cache_log(is_trans_cache);
+ cache_data= cache_mngr->get_binlog_cache_data(is_trans_cache);
+
+ if (thd->lex->stmt_accessed_non_trans_temp_table())
+ cache_data->set_changes_to_non_trans_temp_table();
+
+ thd->binlog_start_trans_and_stmt();
}
-#endif /* USING_TRANSACTIONS */
DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
- if (file == &log_file)
- {
- pthread_mutex_lock(&LOCK_log);
- /*
- We did not want to take LOCK_log unless really necessary.
- However, now that we hold LOCK_log, we must check is_open() again, lest
- the log was closed just before.
- */
- if (unlikely(!is_open()))
- {
- pthread_mutex_unlock(&LOCK_log);
- DBUG_RETURN(error);
- }
- event_info->pre_55_writing_direct();
- }
-
- cache_type= event_info->cache_type;
/*
- No check for auto events flag here - this write method should
- never be called if auto-events are enabled
- */
+ No check for auto events flag here - this write method should
+ never be called if auto-events are enabled.
- /*
- 1. Write first log events which describe the 'run environment'
- of the SQL command
+ Write first log events which describe the 'run environment'
+ of the SQL command. If row-based binlogging, Insert_id, Rand
+ and other kind of "setting context" events are not needed.
*/
if (with_annotate && *with_annotate)
{
DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
- Annotate_rows_log_event anno(thd, cache_type);
+ Annotate_rows_log_event anno(thd, using_trans, direct);
/* Annotate event should be written not more than once */
*with_annotate= 0;
if (anno.write(file))
goto err;
}
- /*
- If row-based binlogging, Insert_id, Rand and other kind of "setting
- context" events are not needed.
- */
+ if (thd)
{
- if (!thd->current_stmt_binlog_row_based)
+ if (!thd->is_current_stmt_binlog_format_row())
{
if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
{
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog,
- cache_type);
+ using_trans, direct);
if (e.write(file))
- goto err_unlock;
+ goto err;
}
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
{
@@ -4675,16 +5140,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
nb_elements()));
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
- minimum(), cache_type);
+ minimum(), using_trans, direct);
if (e.write(file))
- goto err_unlock;
+ goto err;
}
if (thd->rand_used)
{
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
- cache_type);
+ using_trans, direct);
if (e.write(file))
- goto err_unlock;
+ goto err;
}
if (thd->user_var_events.elements)
{
@@ -4692,56 +5157,85 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
BINLOG_USER_VAR_EVENT *user_var_event;
get_dynamic(&thd->user_var_events,(uchar*) &user_var_event, i);
+
+ /* setting flags for user var log event */
+ uchar flags= User_var_log_event::UNDEF_F;
+ if (user_var_event->unsigned_flag)
+ flags|= User_var_log_event::UNSIGNED_F;
+
User_var_log_event e(thd, user_var_event->user_var_event->name.str,
user_var_event->user_var_event->name.length,
user_var_event->value,
user_var_event->length,
user_var_event->type,
user_var_event->charset_number,
- cache_type);
+ flags,
+ using_trans,
+ direct);
if (e.write(file))
- goto err_unlock;
+ goto err;
}
}
}
}
- /* Write the SQL command */
- if (event_info->write(file) ||
+ /*
+ Write the event.
+ */
+ if (event_info->write(file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
- goto err_unlock;
+ goto err;
- if (file == &log_file) // we are writing to the real log (disk)
+ error= 0;
+err:
+ if (direct)
{
- ulonglong data_written= (my_b_tell(file) - my_org_b_tell);
- status_var_add(thd->status_var.binlog_bytes_written, data_written);
+ my_off_t offset= my_b_tell(file);
+ bool check_purge= false;
- if (flush_and_sync())
- goto err_unlock;
- signal_update();
- if ((error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED)))
- goto err_unlock;
-
- }
- error=0;
+ if (!error)
+ {
+ bool synced;
+
+ if ((error= flush_and_sync(&synced)))
+ {
+ }
+ else if ((error= RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, file->pos_in_file, synced))))
+ {
+ sql_print_error("Failed to run 'after_flush' hooks");
+ }
+ else
+ {
+ signal_update();
+ if ((error= rotate(false, &check_purge)))
+ check_purge= false;
+ }
+ }
+
+ status_var_add(thd->status_var.binlog_bytes_written,
+ offset - my_org_b_tell);
-err_unlock:
- if (file == &log_file)
- {
- my_off_t offset= my_b_tell(&log_file);
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs.
*/
- pthread_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_commit_ordered);
last_commit_pos_offset= offset;
- pthread_mutex_unlock(&LOCK_commit_ordered);
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_log);
+
+ if (check_purge)
+ purge();
}
-err:
if (error)
- set_write_error(thd);
+ {
+ set_write_error(thd, is_trans_cache);
+ if (check_write_error(thd) && cache_data &&
+ stmt_has_updated_non_trans_table(thd))
+ cache_data->set_incident();
+ }
}
DBUG_RETURN(error);
@@ -4773,7 +5267,7 @@ bool LOGGER::log_command(THD *thd, enum enum_server_command command)
*/
if (*general_log_handler_list && (what_to_log & (1L << (uint) command)))
{
- if ((thd->options & OPTION_LOG_OFF)
+ if ((thd->variables.option_bits & OPTION_LOG_OFF)
#ifndef NO_EMBEDDED_ACCESS_CHECKS
&& (sctx->master_access & SUPER_ACL)
#endif
@@ -4818,25 +5312,29 @@ bool general_log_write(THD *thd, enum enum_server_command command,
}
/**
+ The method executes rotation when LOCK_log is already acquired
+ by the caller.
+
+ @param force_rotate caller can request the log rotation
+ @param check_purge is set to true if rotation took place
+
@note
If rotation fails, for instance the server was unable
to create a new log file, we still try to write an
incident event to the current log.
@retval
- nonzero - error
+ nonzero - error in rotating routine.
*/
-int MYSQL_BIN_LOG::rotate_and_purge(uint flags)
+int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
{
int error= 0;
- DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge");
-#ifdef HAVE_REPLICATION
- bool check_purge= false;
-#endif
- if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
- pthread_mutex_lock(&LOCK_log);
- if ((flags & RP_FORCE_ROTATE) ||
- (my_b_tell(&log_file) >= (my_off_t) max_size))
+ DBUG_ENTER("MYSQL_BIN_LOG::rotate");
+
+ //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log);
+ *check_purge= false;
+
+ if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size))
{
if ((error= new_file_without_locking()))
/**
@@ -4849,37 +5347,73 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags)
to the current log.
*/
if (!write_incident_already_locked(current_thd))
- flush_and_sync();
+ flush_and_sync(0);
-#ifdef HAVE_REPLICATION
- check_purge= true;
-#endif
- if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
- checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; // done
+ *check_purge= true;
}
- if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
- pthread_mutex_unlock(&LOCK_log);
+ DBUG_RETURN(error);
+}
+
+/**
+ The method executes logs purging routine.
+
+ @retval
+ nonzero - error in rotating routine.
+*/
+void MYSQL_BIN_LOG::purge()
+{
+ mysql_mutex_assert_not_owner(&LOCK_log);
#ifdef HAVE_REPLICATION
- /*
- NOTE: Run purge_logs wo/ holding LOCK_log
- as it otherwise will deadlock in ndbcluster_binlog_index_purge_file
- */
- if (!error && check_purge && expire_logs_days)
+ if (expire_logs_days)
{
+ DEBUG_SYNC(current_thd, "at_purge_logs_before_date");
time_t purge_time= my_time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
+ {
purge_logs_before_date(purge_time);
+ }
+ DEBUG_SYNC(current_thd, "after_purge_logs_before_date");
}
#endif
+}
+
+/**
+ The method is a shortcut of @c rotate() and @c purge().
+ LOCK_log is acquired prior to rotate and is released after it.
+
+ @param force_rotate caller can request the log rotation
+
+ @retval
+ nonzero - error in rotating routine.
+*/
+int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate)
+{
+ int error= 0;
+ DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge");
+ bool check_purge= false;
+
+ //todo: fix the macro def and restore safe_mutex_assert_not_owner(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
+ if ((error= rotate(force_rotate, &check_purge)))
+ check_purge= false;
+ /*
+ NOTE: Run purge_logs wo/ holding LOCK_log because it does not need
+ the mutex. Otherwise causes various deadlocks.
+ */
+ mysql_mutex_unlock(&LOCK_log);
+
+ if (check_purge)
+ purge();
+
DBUG_RETURN(error);
}
uint MYSQL_BIN_LOG::next_file_id()
{
uint res;
- pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
res = file_id++;
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
return res;
}
@@ -4930,7 +5464,7 @@ uint MYSQL_BIN_LOG::next_file_id()
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
- safe_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
@@ -4967,7 +5501,6 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
do
{
-
/*
if we only got a partial header in the last iteration,
get the other half now and process a full header.
@@ -5157,9 +5690,9 @@ int query_error_code(THD *thd, bool not_killed)
if (not_killed || (killed_mask_hard(thd->killed) == KILL_BAD_DATA))
{
- error= thd->is_error() ? thd->main_da.sql_errno() : 0;
+ error= thd->is_error() ? thd->stmt_da->sql_errno() : 0;
- /* thd->main_da.sql_errno() might be ER_SERVER_SHUTDOWN or
+ /* thd->stmt_da->sql_errno() might be ER_SERVER_SHUTDOWN or
ER_QUERY_INTERRUPTED, So here we need to make sure that error
is not set to these errors when specified not_killed by the
caller.
@@ -5188,7 +5721,6 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
if (likely(is_open()))
{
- ev.pre_55_writing_direct();
error= ev.write(&log_file);
status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
}
@@ -5201,27 +5733,33 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
{
uint error= 0;
my_off_t offset;
+ bool check_purge= false;
DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
- pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
if (likely(is_open()))
{
if (!(error= write_incident_already_locked(thd)) &&
- !(error= flush_and_sync()))
+ !(error= flush_and_sync(0)))
{
signal_update();
- error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
+ if ((error= rotate(false, &check_purge)))
+ check_purge= false;
}
+
offset= my_b_tell(&log_file);
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs.
*/
- pthread_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_commit_ordered);
last_commit_pos_offset= offset;
- pthread_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_log);
+
+ if (check_purge)
+ purge();
}
- pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -5251,16 +5789,21 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
*/
bool
-MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
- Log_event *end_ev, bool all)
+MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
+ binlog_cache_mngr *cache_mngr,
+ Log_event *end_ev, bool all,
+ bool using_stmt_cache,
+ bool using_trx_cache)
{
group_commit_entry entry;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
entry.thd= thd;
- entry.trx_data= trx_data;
+ entry.cache_mngr= cache_mngr;
entry.error= 0;
entry.all= all;
+ entry.using_stmt_cache= using_stmt_cache;
+ entry.using_trx_cache= using_trx_cache;
/*
Log "BEGIN" at the beginning of every transaction. Here, a transaction is
@@ -5272,10 +5815,12 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
Due to group commit the actual writing to binlog may happen in a different
thread.
*/
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
+ TRUE, 0);
entry.begin_event= &qinfo;
entry.end_event= end_ev;
- if (trx_data->has_incident())
+ if (cache_mngr->stmt_cache.has_incident() ||
+ cache_mngr->trx_cache.has_incident())
{
Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
entry.incident_event= &inc_ev;
@@ -5300,18 +5845,18 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
*/
entry->thd->clear_wakeup_ready();
- pthread_mutex_lock(&LOCK_prepare_ordered);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
group_commit_entry *orig_queue= group_commit_queue;
entry->next= orig_queue;
group_commit_queue= entry;
- if (entry->trx_data->using_xa)
+ if (entry->cache_mngr->using_xa)
{
DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
run_prepare_ordered(entry->thd, entry->all);
DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
}
- pthread_mutex_unlock(&LOCK_prepare_ordered);
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
/*
@@ -5327,21 +5872,21 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
/* For the leader, trx_group_commit_leader() already took the lock. */
if (orig_queue != NULL)
- pthread_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_commit_ordered);
DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
++num_commits;
- if (entry->trx_data->using_xa && !entry->error)
+ if (entry->cache_mngr->using_xa && !entry->error)
run_commit_ordered(entry->thd, entry->all);
group_commit_entry *next= entry->next;
if (!next)
{
group_commit_queue_busy= FALSE;
- pthread_cond_signal(&COND_queue_busy);
+ mysql_cond_signal(&COND_queue_busy);
DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
}
- pthread_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
if (next)
{
@@ -5359,7 +5904,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
break;
case ER_ERROR_ON_READ:
my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
- entry->trx_data->trans_log.file_name, entry->commit_errno);
+ entry->error_cache->file_name, entry->commit_errno);
break;
default:
/*
@@ -5377,7 +5922,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
we need to mark it as not needed for recovery (unlog() is not called
for a transaction if log_xid() fails).
*/
- if (entry->trx_data->using_xa && entry->trx_data->xa_xid)
+ if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid)
mark_xid_done();
return 1;
@@ -5386,7 +5931,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
/*
Do binlog group commit as the lead thread.
- This must be called when this thread/transaction is queued at the start of
+ This must be called when this statement/transaction is queued at the start of
the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group
commit all the transactions in the queue (more may have entered while waiting
for LOCK_log). After commit is done, all other threads in the queue will be
@@ -5397,42 +5942,42 @@ void
MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
uint xid_count= 0;
- uint write_count= 0;
- my_off_t commit_offset;
+ my_off_t UNINIT_VAR(commit_offset);
group_commit_entry *current;
group_commit_entry *last_in_queue;
- DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
- LINT_INIT(commit_offset);
-
- /*
- Lock the LOCK_log(), and once we get it, collect any additional writes
- that queued up while we were waiting.
- */
- VOID(pthread_mutex_lock(&LOCK_log));
- DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
-
- pthread_mutex_lock(&LOCK_prepare_ordered);
- current= group_commit_queue;
- group_commit_queue= NULL;
- pthread_mutex_unlock(&LOCK_prepare_ordered);
-
- /* As the queue is in reverse order of entering, reverse it. */
group_commit_entry *queue= NULL;
- last_in_queue= current;
- while (current)
- {
- group_commit_entry *next= current->next;
- current->next= queue;
- queue= current;
- current= next;
- }
- DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
+ bool check_purge= false;
+ DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
- /* Now we have in queue the list of transactions to be committed in order. */
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
/*
+ Lock the LOCK_log(), and once we get it, collect any additional writes
+ that queued up while we were waiting.
+ */
+ mysql_mutex_lock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
+
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ current= group_commit_queue;
+ group_commit_queue= NULL;
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+
+ /* As the queue is in reverse order of entering, reverse it. */
+ last_in_queue= current;
+ while (current)
+ {
+ group_commit_entry *next= current->next;
+ current->next= queue;
+ queue= current;
+ current= next;
+ }
+ DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
+
+ /* Now we have in queue the list of transactions to be committed in order. */
+
+ /*
Commit every transaction in the queue.
Note that we are doing this in a different thread than the one running
@@ -5444,46 +5989,61 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
for (current= queue; current != NULL; current= current->next)
{
- binlog_trx_data *trx_data= current->trx_data;
- IO_CACHE *cache= &trx_data->trans_log;
+ binlog_cache_mngr *cache_mngr= current->cache_mngr;
/*
- We only bother to write to the binary log if there is anything
- to write.
+ We already checked before that at least one cache is non-empty; if both
+ are empty we would have skipped calling into here.
*/
- if (my_b_tell(cache) > 0)
- {
- if ((current->error= write_transaction(current)))
- current->commit_errno= errno;
+ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- write_count++;
- }
+ current->error= write_transaction_or_stmt(current);
- strmake(trx_data->last_commit_pos_file, log_file_name,
- sizeof(trx_data->last_commit_pos_file)-1);
+ strmake(cache_mngr->last_commit_pos_file, log_file_name,
+ sizeof(cache_mngr->last_commit_pos_file)-1);
commit_offset= my_b_write_tell(&log_file);
- trx_data->last_commit_pos_offset= commit_offset;
- if (trx_data->using_xa && trx_data->xa_xid)
+ cache_mngr->last_commit_pos_offset= commit_offset;
+ if (cache_mngr->using_xa && cache_mngr->xa_xid)
xid_count++;
}
- if (write_count > 0)
+ bool synced= 0;
+ if (flush_and_sync(&synced))
{
- if (flush_and_sync())
+ for (current= queue; current != NULL; current= current->next)
{
- for (current= queue; current != NULL; current= current->next)
+ if (!current->error)
{
- if (!current->error)
- {
- current->error= ER_ERROR_ON_WRITE;
- current->commit_errno= errno;
- }
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= errno;
+ current->error_cache= NULL;
}
}
- else
+ }
+ else
+ {
+ bool any_error= false;
+ bool all_error= true;
+ for (current= queue; current != NULL; current= current->next)
{
- signal_update();
+ if (!current->error &&
+ RUN_HOOK(binlog_storage, after_flush,
+ (current->thd, log_file_name,
+ current->cache_mngr->last_commit_pos_offset, synced)))
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= -1;
+ current->error_cache= NULL;
+ any_error= true;
+ }
+ else
+ all_error= false;
}
+
+ if (any_error)
+ sql_print_error("Failed to run 'after_flush' hooks");
+ if (!all_error)
+ signal_update();
}
/*
@@ -5500,7 +6060,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
}
else
{
- if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))
+ if (rotate(false, &check_purge))
{
/*
If we fail to rotate, which thread should get the error?
@@ -5509,12 +6069,13 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
last_in_queue->error= ER_ERROR_ON_WRITE;
last_in_queue->commit_errno= errno;
+ check_purge= false;
}
}
}
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
- pthread_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_commit_ordered);
last_commit_pos_offset= commit_offset;
/*
We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
@@ -5522,7 +6083,11 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
messing up the order of commit_ordered() calls. But as soon as
LOCK_commit_ordered is obtained, we can let the next group commit start.
*/
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
+
+ if (check_purge)
+ purge();
+
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
++num_group_commits;
@@ -5537,7 +6102,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
while (group_commit_queue_busy)
- pthread_cond_wait(&COND_queue_busy, &LOCK_commit_ordered);
+ mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered);
group_commit_queue_busy= TRUE;
/* Note that we return with LOCK_commit_ordered locked! */
@@ -5555,7 +6120,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
++num_commits;
- if (current->trx_data->using_xa && !current->error)
+ if (current->cache_mngr->using_xa && !current->error)
run_commit_ordered(current->thd, current->all);
/*
@@ -5568,63 +6133,91 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
current= next;
}
DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
- pthread_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
DBUG_VOID_RETURN;
}
+
int
-MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
+MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
{
- binlog_trx_data *trx_data= entry->trx_data;
- IO_CACHE *cache= &trx_data->trans_log;
+ binlog_cache_mngr *mngr= entry->cache_mngr;
- entry->begin_event->pre_55_writing_direct();
if (entry->begin_event->write(&log_file))
return ER_ERROR_ON_WRITE;
status_var_add(entry->thd->status_var.binlog_bytes_written,
entry->begin_event->data_written);
- DBUG_EXECUTE_IF("crash_before_writing_xid",
- {
- if ((write_cache(entry->thd, cache)))
- DBUG_PRINT("info", ("error writing binlog cache"));
- else
- flush_and_sync();
+ if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
+ write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+
+ if (entry->using_trx_cache && !mngr->trx_cache.empty())
+ {
+ DBUG_EXECUTE_IF("crash_before_writing_xid",
+ {
+ if ((write_cache(entry->thd,
+ mngr->get_binlog_cache_log(TRUE))))
+ DBUG_PRINT("info", ("error writing binlog cache"));
+ else
+ flush_and_sync(0);
- DBUG_PRINT("info", ("crashing before writing xid"));
- DBUG_SUICIDE();
- });
+ DBUG_PRINT("info", ("crashing before writing xid"));
+ DBUG_SUICIDE();
+ });
- if (write_cache(entry->thd, cache))
- return ER_ERROR_ON_WRITE;
+ if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ }
- entry->end_event->pre_55_writing_direct();
if (entry->end_event->write(&log_file))
+ {
+ entry->error_cache= NULL;
+ entry->commit_errno= errno;
return ER_ERROR_ON_WRITE;
+ }
status_var_add(entry->thd->status_var.binlog_bytes_written,
entry->end_event->data_written);
if (entry->incident_event)
{
- entry->incident_event->pre_55_writing_direct();
if (entry->incident_event->write(&log_file))
+ {
+ entry->error_cache= NULL;
+ entry->commit_errno= errno;
return ER_ERROR_ON_WRITE;
+ }
}
- if (cache->error) // Error on read
+ if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ entry->commit_errno= errno;
return ER_ERROR_ON_READ;
+ }
+ if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_READ;
+ }
return 0;
}
/**
- Wait until we get a signal that the binary log has been updated.
+ Wait until we get a signal that the relay log has been updated.
@param thd Thread variable
- @param is_slave If 0, the caller is the Binlog_dump thread from master;
- if 1, the caller is the SQL thread from the slave. This
- influences only thd->proc_info.
@note
One must have a lock on LOCK_log before calling this function.
@@ -5632,22 +6225,50 @@ MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
THD::enter_cond() (see NOTES in sql_class.h).
*/
-void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave)
+void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
{
const char *old_msg;
- DBUG_ENTER("wait_for_update");
+ DBUG_ENTER("wait_for_update_relay_log");
old_msg= thd->enter_cond(&update_cond, &LOCK_log,
- is_slave ?
- "Has read all relay log; waiting for the slave I/O "
- "thread to update it" :
- "Has sent all binlog to slave; waiting for binlog "
- "to be updated");
- pthread_cond_wait(&update_cond, &LOCK_log);
+ "Slave has read all relay log; "
+ "waiting for the slave I/O "
+ "thread to update it" );
+ mysql_cond_wait(&update_cond, &LOCK_log);
thd->exit_cond(old_msg);
DBUG_VOID_RETURN;
}
+/**
+ Wait until we get a signal that the binary log has been updated.
+ Applies to master only.
+
+ NOTES
+ @param[in] thd a THD struct
+ @param[in] timeout a pointer to a timespec;
+ NULL means to wait w/o timeout.
+ @retval 0 if got signalled on update
+ @retval non-0 if wait timeout elapsed
+ @note
+ LOCK_log must be taken before calling this function.
+ LOCK_log is being released while the thread is waiting.
+ LOCK_log is released by the caller.
+*/
+
+int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
+ const struct timespec *timeout)
+{
+ int ret= 0;
+ DBUG_ENTER("wait_for_update_bin_log");
+
+ if (!timeout)
+ mysql_cond_wait(&update_cond, &LOCK_log);
+ else
+ ret= mysql_cond_timedwait(&update_cond, &LOCK_log,
+ const_cast<struct timespec *>(timeout));
+ DBUG_RETURN(ret);
+}
+
/**
Close the log file.
@@ -5679,7 +6300,6 @@ void MYSQL_BIN_LOG::close(uint exiting)
(uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options;
DBUG_ASSERT(!is_relay_log ||
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
- s.pre_55_writing_direct();
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
@@ -5690,16 +6310,16 @@ void MYSQL_BIN_LOG::close(uint exiting)
if (log_file.type == WRITE_CACHE && log_type == LOG_BIN)
{
my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
- my_off_t org_position= my_tell(log_file.file, MYF(0));
+ my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
uchar flags= 0; // clearing LOG_EVENT_BINLOG_IN_USE_F
- my_pwrite(log_file.file, &flags, 1, offset, MYF(0));
+ mysql_file_pwrite(log_file.file, &flags, 1, offset, MYF(0));
/*
Restore position so that anything we have in the IO_cache is written
to the correct position.
- We need the seek here, as my_pwrite() is not guaranteed to keep the
+ We need the seek here, as mysql_file_pwrite() is not guaranteed to keep the
original position on system that doesn't support pwrite().
*/
- my_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0));
+ mysql_file_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0));
}
/* this will cleanup IO_CACHE, sync and close the file */
@@ -5714,14 +6334,15 @@ void MYSQL_BIN_LOG::close(uint exiting)
if ((exiting & LOG_CLOSE_INDEX) && my_b_inited(&index_file))
{
end_io_cache(&index_file);
- if (my_close(index_file.file, MYF(0)) < 0 && ! write_error)
+ if (mysql_file_close(index_file.file, MYF(0)) < 0 && ! write_error)
{
write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name, errno);
}
}
log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
- safeFree(name);
+ my_free(name);
+ name= NULL;
DBUG_VOID_RETURN;
}
@@ -5736,10 +6357,10 @@ void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg)
it's like if the SET command was never run.
*/
DBUG_ENTER("MYSQL_BIN_LOG::set_max_size");
- pthread_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_log);
if (is_open())
max_size= max_size_arg;
- pthread_mutex_unlock(&LOCK_log);
+ mysql_mutex_unlock(&LOCK_log);
DBUG_VOID_RETURN;
}
@@ -5758,11 +6379,11 @@ void MYSQL_BIN_LOG::set_max_size(ulong max_size_arg)
@retval
1 String is a number
@retval
- 0 Error
+ 0 String is not a number
*/
static bool test_if_number(register const char *str,
- long *res, bool allow_wildcards)
+ ulong *res, bool allow_wildcards)
{
reg2 int flag;
const char *start;
@@ -5858,10 +6479,10 @@ bool flush_error_log()
bool result= 0;
if (opt_error_log)
{
- VOID(pthread_mutex_lock(&LOCK_error_log));
+ mysql_mutex_lock(&LOCK_error_log);
if (redirect_std_streams(log_error_file))
result= 1;
- VOID(pthread_mutex_unlock(&LOCK_error_log));
+ mysql_mutex_unlock(&LOCK_error_log);
}
return result;
}
@@ -5869,11 +6490,12 @@ bool flush_error_log()
void MYSQL_BIN_LOG::signal_update()
{
DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
- pthread_cond_broadcast(&update_cond);
+ signal_cnt++;
+ mysql_cond_broadcast(&update_cond);
DBUG_VOID_RETURN;
}
-#ifdef __NT__
+#ifdef _WIN32
static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
size_t length, size_t buffLen)
{
@@ -5906,7 +6528,7 @@ static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
DBUG_VOID_RETURN;
}
-#endif /* __NT__ */
+#endif /* _WIN32 */
#ifndef EMBEDDED_LIBRARY
@@ -5919,7 +6541,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
DBUG_ENTER("print_buffer_to_file");
DBUG_PRINT("enter",("buffer: %s", buffer));
- VOID(pthread_mutex_lock(&LOCK_error_log));
+ mysql_mutex_lock(&LOCK_error_log);
skr= my_time(0);
localtime_r(&skr, &tm_tmp);
@@ -5938,7 +6560,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
fflush(stderr);
- VOID(pthread_mutex_unlock(&LOCK_error_log));
+ mysql_mutex_unlock(&LOCK_error_log);
DBUG_VOID_RETURN;
}
@@ -5967,7 +6589,7 @@ int vprint_msg_to_log(enum loglevel level, const char *format, va_list args)
length= my_vsnprintf(buff, sizeof(buff), format, args);
print_buffer_to_file(level, buff, length);
-#ifdef __NT__
+#ifdef _WIN32
print_buffer_to_nt_eventlog(level, buff, length, sizeof(buff));
#endif
@@ -6016,35 +6638,12 @@ void sql_print_information(const char *format, ...)
void
-TC_init()
-{
- my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW,
- "LOCK_prepare_ordered", MYF(0));
- my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW,
- "LOCK_commit_ordered", MYF(0));
- mutexes_inited= TRUE;
-}
-
-
-void
-TC_destroy()
-{
- if (mutexes_inited)
- {
- pthread_mutex_destroy(&LOCK_prepare_ordered);
- pthread_mutex_destroy(&LOCK_commit_ordered);
- mutexes_inited= FALSE;
- }
-}
-
-
-void
TC_LOG::run_prepare_ordered(THD *thd, bool all)
{
Ha_trx_info *ha_info=
all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
- safe_mutex_assert_owner(&LOCK_prepare_ordered);
+ mysql_mutex_assert_owner(&LOCK_prepare_ordered);
for (; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
@@ -6061,7 +6660,7 @@ TC_LOG::run_commit_ordered(THD *thd, bool all)
Ha_trx_info *ha_info=
all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
- safe_mutex_assert_owner(&LOCK_commit_ordered);
+ mysql_mutex_assert_owner(&LOCK_commit_ordered);
for (; ha_info; ha_info= ha_info->next())
{
handlerton *ht= ha_info->ht();
@@ -6084,7 +6683,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
if (need_prepare_ordered)
{
- pthread_mutex_lock(&LOCK_prepare_ordered);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
run_prepare_ordered(thd, all);
if (need_commit_ordered)
{
@@ -6099,7 +6698,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
commit_ordered_queue= &entry;
is_group_commit_leader= (previous_queue == NULL);
}
- pthread_mutex_unlock(&LOCK_prepare_ordered);
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
}
cookie= 0;
@@ -6121,9 +6720,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
if (is_group_commit_leader)
{
/* The first in queue starts the ball rolling. */
- pthread_mutex_lock(&LOCK_prepare_ordered);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
while (commit_ordered_queue_busy)
- pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered);
+ mysql_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered);
commit_entry *queue= commit_ordered_queue;
commit_ordered_queue= NULL;
/*
@@ -6131,7 +6730,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
next.
*/
commit_ordered_queue_busy= true;
- pthread_mutex_unlock(&LOCK_prepare_ordered);
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
/* Reverse the queue list so we get correct order. */
commit_entry *prev= NULL;
@@ -6154,9 +6753,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
/* Only run commit_ordered() if log_xid was successful. */
if (cookie)
{
- pthread_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_commit_ordered);
run_commit_ordered(thd, all);
- pthread_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
}
if (need_prepare_ordered)
@@ -6168,10 +6767,10 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
}
else
{
- pthread_mutex_lock(&LOCK_prepare_ordered);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
commit_ordered_queue_busy= false;
- pthread_cond_signal(&COND_queue_busy);
- pthread_mutex_unlock(&LOCK_prepare_ordered);
+ mysql_cond_signal(&COND_queue_busy);
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
}
}
}
@@ -6242,17 +6841,18 @@ int TC_LOG_MMAP::open(const char *opt_name)
DBUG_ASSERT(TC_LOG_PAGE_SIZE % tc_log_page_size == 0);
fn_format(logname,opt_name,mysql_data_home,"",MY_UNPACK_FILENAME);
- if ((fd= my_open(logname, O_RDWR, MYF(0))) < 0)
+ if ((fd= mysql_file_open(key_file_tclog, logname, O_RDWR, MYF(0))) < 0)
{
if (my_errno != ENOENT)
goto err;
if (using_heuristic_recover())
return 1;
- if ((fd= my_create(logname, CREATE_MODE, O_RDWR, MYF(MY_WME))) < 0)
+ if ((fd= mysql_file_create(key_file_tclog, logname, CREATE_MODE,
+ O_RDWR, MYF(MY_WME))) < 0)
goto err;
inited=1;
file_length= opt_tc_log_size;
- if (my_chsize(fd, file_length, 0, MYF(MY_WME)))
+ if (mysql_file_chsize(fd, file_length, 0, MYF(MY_WME)))
goto err;
}
else
@@ -6266,7 +6866,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
"--tc-heuristic-recover is used");
goto err;
}
- file_length= my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE));
+ file_length= mysql_file_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE));
if (file_length == MY_FILEPOS_ERROR || file_length % tc_log_page_size)
goto err;
}
@@ -6290,9 +6890,9 @@ int TC_LOG_MMAP::open(const char *opt_name)
{
pg->next=pg+1;
pg->waiters=0;
- pg->state=POOL;
- pthread_mutex_init(&pg->lock, MY_MUTEX_INIT_FAST);
- pthread_cond_init (&pg->cond, 0);
+ pg->state=PS_POOL;
+ mysql_mutex_init(key_PAGE_lock, &pg->lock, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_PAGE_cond, &pg->cond, 0);
pg->ptr= pg->start=(my_xid *)(data + i*tc_log_page_size);
pg->size=pg->free=tc_log_page_size/sizeof(my_xid);
pg->end=pg->start + pg->size;
@@ -6311,12 +6911,12 @@ int TC_LOG_MMAP::open(const char *opt_name)
my_msync(fd, data, tc_log_page_size, MS_SYNC);
inited=5;
- pthread_mutex_init(&LOCK_sync, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&LOCK_active, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&COND_active, 0);
- pthread_cond_init(&COND_pool, 0);
- pthread_cond_init(&COND_queue_busy, 0);
+ mysql_mutex_init(key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_active, &LOCK_active, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_active, &COND_active, 0);
+ mysql_cond_init(key_COND_pool, &COND_pool, 0);
+ mysql_cond_init(key_TC_LOG_MMAP_COND_queue_busy, &COND_queue_busy, 0);
inited=6;
@@ -6351,7 +6951,7 @@ void TC_LOG_MMAP::get_active_from_pool()
PAGE **p, **best_p=0;
int best_free;
- pthread_mutex_lock(&LOCK_pool);
+ mysql_mutex_lock(&LOCK_pool);
do
{
@@ -6371,16 +6971,16 @@ void TC_LOG_MMAP::get_active_from_pool()
}
while ((*best_p == 0 || best_free == 0) && overflow());
- safe_mutex_assert_owner(&LOCK_active);
+ mysql_mutex_assert_owner(&LOCK_active);
active=*best_p;
if ((*best_p)->next) // unlink the page from the pool
*best_p=(*best_p)->next;
else
pool_last=*best_p;
- pthread_mutex_unlock(&LOCK_pool);
+ mysql_mutex_unlock(&LOCK_pool);
- pthread_mutex_lock(&active->lock);
+ mysql_mutex_lock(&active->lock);
if (active->free == active->size) // we've chosen an empty page
{
tc_log_cur_pages_used++;
@@ -6400,7 +7000,7 @@ int TC_LOG_MMAP::overflow()
let's check the behaviour of tc_log_page_waits first
*/
tc_log_page_waits++;
- pthread_cond_wait(&COND_pool, &LOCK_pool);
+ mysql_cond_wait(&COND_pool, &LOCK_pool);
return 1; // always return 1
}
@@ -6437,7 +7037,7 @@ int TC_LOG_MMAP::log_one_transaction(my_xid xid)
PAGE *p;
ulong cookie;
- pthread_mutex_lock(&LOCK_active);
+ mysql_mutex_lock(&LOCK_active);
/*
if the active page is full - just wait...
@@ -6447,13 +7047,13 @@ int TC_LOG_MMAP::log_one_transaction(my_xid xid)
unlog() does not signal COND_active.
*/
while (unlikely(active && active->free == 0))
- pthread_cond_wait(&COND_active, &LOCK_active);
+ mysql_cond_wait(&COND_active, &LOCK_active);
/* no active page ? take one from the pool */
if (active == 0)
get_active_from_pool();
else
- pthread_mutex_lock(&active->lock);
+ mysql_mutex_lock(&active->lock);
p=active;
@@ -6475,51 +7075,51 @@ int TC_LOG_MMAP::log_one_transaction(my_xid xid)
cookie= (ulong)((uchar *)p->ptr - data); // can never be zero
*p->ptr++= xid;
p->free--;
- p->state= DIRTY;
- pthread_mutex_unlock(&p->lock);
+ p->state= PS_DIRTY;
+ mysql_mutex_unlock(&p->lock);
- pthread_mutex_lock(&LOCK_sync);
+ mysql_mutex_lock(&LOCK_sync);
if (syncing)
{ // somebody's syncing. let's wait
- pthread_mutex_unlock(&LOCK_active);
- pthread_mutex_lock(&p->lock);
+ mysql_mutex_unlock(&LOCK_active);
+ mysql_mutex_lock(&p->lock);
p->waiters++;
for (;;)
{
- int not_dirty = p->state != DIRTY;
- pthread_mutex_unlock(&p->lock);
+ int not_dirty = p->state != PS_DIRTY;
+ mysql_mutex_unlock(&p->lock);
if (not_dirty || !syncing)
break;
- pthread_cond_wait(&p->cond, &LOCK_sync);
- pthread_mutex_lock(&p->lock);
+ mysql_cond_wait(&p->cond, &LOCK_sync);
+ mysql_mutex_lock(&p->lock);
}
p->waiters--;
- err= p->state == ERROR;
- if (p->state != DIRTY) // page was synced
+ err= p->state == PS_ERROR;
+ if (p->state != PS_DIRTY) // page was synced
{
- pthread_mutex_unlock(&LOCK_sync);
+ mysql_mutex_unlock(&LOCK_sync);
if (p->waiters == 0)
- pthread_cond_signal(&COND_pool); // in case somebody's waiting
- pthread_mutex_unlock(&p->lock);
+ mysql_cond_signal(&COND_pool); // in case somebody's waiting
+ mysql_mutex_unlock(&p->lock);
goto done; // we're done
}
DBUG_ASSERT(!syncing);
- pthread_mutex_unlock(&p->lock);
+ mysql_mutex_unlock(&p->lock);
syncing = p;
- pthread_mutex_unlock(&LOCK_sync);
+ mysql_mutex_unlock(&LOCK_sync);
- pthread_mutex_lock(&LOCK_active);
+ mysql_mutex_lock(&LOCK_active);
active=0; // page is not active anymore
- pthread_cond_broadcast(&COND_active);
- pthread_mutex_unlock(&LOCK_active);
+ mysql_cond_broadcast(&COND_active);
+ mysql_mutex_unlock(&LOCK_active);
}
else
{
syncing = p; // place is vacant - take it
- pthread_mutex_unlock(&LOCK_sync);
+ mysql_mutex_unlock(&LOCK_sync);
active = 0; // page is not active anymore
- pthread_cond_broadcast(&COND_active);
- pthread_mutex_unlock(&LOCK_active);
+ mysql_cond_broadcast(&COND_active);
+ mysql_mutex_unlock(&LOCK_active);
}
err= sync();
@@ -6540,17 +7140,17 @@ int TC_LOG_MMAP::sync()
err= my_msync(fd, syncing->start, syncing->size * sizeof(my_xid), MS_SYNC);
/* page is synced. let's move it to the pool */
- pthread_mutex_lock(&LOCK_pool);
+ mysql_mutex_lock(&LOCK_pool);
pool_last->next=syncing;
pool_last=syncing;
syncing->next=0;
- syncing->state= err ? ERROR : POOL;
- pthread_cond_signal(&COND_pool); // in case somebody's waiting
- pthread_mutex_unlock(&LOCK_pool);
+ syncing->state= err ? PS_ERROR : PS_POOL;
+ mysql_cond_signal(&COND_pool); // in case somebody's waiting
+ mysql_mutex_unlock(&LOCK_pool);
/* marking 'syncing' slot free */
- pthread_mutex_lock(&LOCK_sync);
- pthread_cond_broadcast(&syncing->cond); // signal "sync done"
+ mysql_mutex_lock(&LOCK_sync);
+ mysql_cond_broadcast(&syncing->cond); // signal "sync done"
syncing=0;
/*
we check the "active" pointer without LOCK_active. Still, it's safe -
@@ -6561,8 +7161,8 @@ int TC_LOG_MMAP::sync()
(the thread that will send a signal below)
*/
if (active)
- pthread_cond_signal(&active->cond); // wake up a new syncer
- pthread_mutex_unlock(&LOCK_sync);
+ mysql_cond_signal(&active->cond); // wake up a new syncer
+ mysql_mutex_unlock(&LOCK_sync);
return err;
}
@@ -6579,7 +7179,7 @@ int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
DBUG_ASSERT(*x == xid);
DBUG_ASSERT(x >= p->start && x < p->end);
- pthread_mutex_lock(&p->lock);
+ mysql_mutex_lock(&p->lock);
*x=0;
p->free++;
DBUG_ASSERT(p->free <= p->size);
@@ -6587,8 +7187,8 @@ int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
if (p->free == p->size) // the page is completely empty
statistic_decrement(tc_log_cur_pages_used, &LOCK_status);
if (p->waiters == 0) // the page is in pool and ready to rock
- pthread_cond_signal(&COND_pool); // ping ... for overflow()
- pthread_mutex_unlock(&p->lock);
+ mysql_cond_signal(&COND_pool); // ping ... for overflow()
+ mysql_mutex_unlock(&p->lock);
return 0;
}
@@ -6597,31 +7197,31 @@ void TC_LOG_MMAP::close()
uint i;
switch (inited) {
case 6:
- pthread_mutex_destroy(&LOCK_sync);
- pthread_mutex_destroy(&LOCK_active);
- pthread_mutex_destroy(&LOCK_pool);
- pthread_cond_destroy(&COND_pool);
- pthread_cond_destroy(&COND_active);
- pthread_cond_destroy(&COND_queue_busy);
+ mysql_mutex_destroy(&LOCK_sync);
+ mysql_mutex_destroy(&LOCK_active);
+ mysql_mutex_destroy(&LOCK_pool);
+ mysql_cond_destroy(&COND_pool);
+ mysql_cond_destroy(&COND_active);
+ mysql_cond_destroy(&COND_queue_busy);
case 5:
- data[0]='A'; // garble the first (signature) byte, in case my_delete fails
+ data[0]='A'; // garble the first (signature) byte, in case mysql_file_delete fails
case 4:
for (i=0; i < npages; i++)
{
if (pages[i].ptr == 0)
break;
- pthread_mutex_destroy(&pages[i].lock);
- pthread_cond_destroy(&pages[i].cond);
+ mysql_mutex_destroy(&pages[i].lock);
+ mysql_cond_destroy(&pages[i].cond);
}
case 3:
- my_free((uchar*)pages, MYF(0));
+ my_free(pages);
case 2:
my_munmap((char*)data, (size_t)file_length);
case 1:
- my_close(fd, MYF(0));
+ mysql_file_close(fd, MYF(0));
}
if (inited>=5) // cannot do in the switch because of Windows
- my_delete(logname, MYF(MY_WME));
+ mysql_file_delete(key_file_tclog, logname, MYF(MY_WME));
inited=0;
}
@@ -6649,8 +7249,8 @@ int TC_LOG_MMAP::recover()
goto err1;
}
- if (hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0,
- sizeof(my_xid), 0, 0, MYF(0)))
+ if (my_hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0,
+ sizeof(my_xid), 0, 0, MYF(0)))
goto err1;
for ( ; p < end_p ; p++)
@@ -6663,12 +7263,12 @@ int TC_LOG_MMAP::recover()
if (ha_recover(&xids))
goto err2;
- hash_free(&xids);
+ my_hash_free(&xids);
bzero(data, (size_t)file_length);
return 0;
err2:
- hash_free(&xids);
+ my_hash_free(&xids);
err1:
sql_print_error("Crash recovery failed. Either correct the problem "
"(if it's, for example, out of memory error) and restart, "
@@ -6726,8 +7326,9 @@ int TC_LOG_BINLOG::open(const char *opt_name)
DBUG_ASSERT(total_ha_2pc > 1);
DBUG_ASSERT(opt_name && opt_name[0]);
- pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST);
- pthread_cond_init (&COND_prep_xids, 0);
+ mysql_mutex_init(key_BINLOG_LOCK_prep_xids,
+ &LOCK_prep_xids, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_BINLOG_COND_prep_xids, &COND_prep_xids, 0);
if (!my_b_inited(&index_file))
{
@@ -6794,7 +7395,7 @@ int TC_LOG_BINLOG::open(const char *opt_name)
delete ev;
end_io_cache(&log);
- my_close(file, MYF(MY_WME));
+ mysql_file_close(file, MYF(MY_WME));
if (error)
goto err;
@@ -6808,8 +7409,8 @@ err:
void TC_LOG_BINLOG::close()
{
DBUG_ASSERT(prepared_xids==0);
- pthread_mutex_destroy(&LOCK_prep_xids);
- pthread_cond_destroy (&COND_prep_xids);
+ mysql_mutex_destroy(&LOCK_prep_xids);
+ mysql_cond_destroy(&COND_prep_xids);
}
/*
@@ -6824,27 +7425,12 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
int err;
DBUG_ENTER("TC_LOG_BINLOG::log_and_order");
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ binlog_cache_mngr *cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- trx_data->using_xa= TRUE;
- trx_data->xa_xid= xid;
- if (xid)
- {
- Xid_log_event xid_event(thd, xid);
- err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all);
- }
- else
- {
- /*
- Empty xid occurs in XA COMMIT ... ONE PHASE.
- In this case, we do not have a MySQL xid for the transaction, and the
- external XA transaction coordinator will have to handle recovery if
- needed. So we end the transaction with a plain COMMIT query event.
- */
- Query_log_event end_event(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
- err= binlog_flush_trx_cache(thd, trx_data, &end_event, all);
- }
+ cache_mngr->using_xa= TRUE;
+ cache_mngr->xa_xid= xid;
+ err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid);
DEBUG_SYNC(thd, "binlog_after_log_and_order");
@@ -6867,9 +7453,9 @@ TC_LOG_BINLOG::mark_xids_active(uint xid_count)
{
DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
DBUG_PRINT("info", ("xid_count=%u", xid_count));
- pthread_mutex_lock(&LOCK_prep_xids);
+ mysql_mutex_lock(&LOCK_prep_xids);
prepared_xids+= xid_count;
- pthread_mutex_unlock(&LOCK_prep_xids);
+ mysql_mutex_unlock(&LOCK_prep_xids);
DBUG_VOID_RETURN;
}
@@ -6886,13 +7472,16 @@ TC_LOG_BINLOG::mark_xid_done()
my_bool send_signal;
DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
- pthread_mutex_lock(&LOCK_prep_xids);
- DBUG_ASSERT(prepared_xids > 0);
- send_signal= !--prepared_xids;
- pthread_mutex_unlock(&LOCK_prep_xids);
+ mysql_mutex_lock(&LOCK_prep_xids);
+ // prepared_xids can be 0 if the transaction had ignorable errors.
+ DBUG_ASSERT(prepared_xids >= 0);
+ if (prepared_xids > 0)
+ prepared_xids--;
+ send_signal= (prepared_xids == 0);
+ mysql_mutex_unlock(&LOCK_prep_xids);
if (send_signal) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
- pthread_cond_signal(&COND_prep_xids);
+ mysql_cond_signal(&COND_prep_xids);
}
DBUG_VOID_RETURN;
}
@@ -6913,8 +7502,8 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
MEM_ROOT mem_root;
if (! fdle->is_valid() ||
- hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
- sizeof(my_xid), 0, 0, MYF(0)))
+ my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
+ sizeof(my_xid), 0, 0, MYF(0)))
goto err1;
init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
@@ -6940,12 +7529,12 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
goto err2;
free_root(&mem_root, MYF(0));
- hash_free(&xids);
+ my_hash_free(&xids);
return 0;
err2:
free_root(&mem_root, MYF(0));
- hash_free(&xids);
+ my_hash_free(&xids);
err1:
sql_print_error("Crash recovery failed. Either correct the problem "
"(if it's, for example, out of memory error) and restart, "
@@ -6987,12 +7576,12 @@ ulonglong mysql_bin_log_file_pos(void)
void
mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
{
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- if (trx_data)
+ binlog_cache_mngr *cache_mngr;
+ if (opt_bin_log &&
+ (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton)))
{
- *out_file= trx_data->last_commit_pos_file;
- *out_pos= (ulonglong)(trx_data->last_commit_pos_offset);
+ *out_file= cache_mngr->last_commit_pos_file;
+ *out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset);
}
else
{
@@ -7008,24 +7597,25 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
ulong value= *((ulong *)save);
+ bool check_purge= false;
- pthread_mutex_lock(mysql_bin_log.get_log_lock());
+ mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
- uint flags= RP_FORCE_ROTATE | RP_LOCK_LOG_IS_ALREADY_LOCKED |
- (binlog_checksum_options != (uint) value?
- RP_BINLOG_CHECKSUM_ALG_CHANGE : 0);
- if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ if (binlog_checksum_options != value)
mysql_bin_log.checksum_alg_reset= (uint8) value;
- mysql_bin_log.rotate_and_purge(flags);
+ if (mysql_bin_log.rotate(true, &check_purge))
+ check_purge= false;
}
else
{
binlog_checksum_options= value;
}
- DBUG_ASSERT((ulong) binlog_checksum_options == value);
- DBUG_ASSERT(mysql_bin_log.checksum_alg_reset == BINLOG_CHECKSUM_ALG_UNDEF);
- pthread_mutex_unlock(mysql_bin_log.get_log_lock());
+ DBUG_ASSERT(binlog_checksum_options == value);
+ mysql_bin_log.checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF;
+ mysql_mutex_unlock(mysql_bin_log.get_log_lock());
+ if (check_purge)
+ mysql_bin_log.purge();
}
@@ -7096,15 +7686,15 @@ set_binlog_snapshot_file(const char *src)
void
TC_LOG_BINLOG::set_status_variables(THD *thd)
{
- binlog_trx_data *trx_data;
+ binlog_cache_mngr *cache_mngr;
- if (thd)
- trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ if (thd && opt_bin_log)
+ cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
else
- trx_data= NULL;
+ cache_mngr= 0;
- bool have_snapshot= (trx_data && trx_data->last_commit_pos_file[0] != 0);
- pthread_mutex_lock(&LOCK_commit_ordered);
+ bool have_snapshot= (cache_mngr && cache_mngr->last_commit_pos_file[0] != 0);
+ mysql_mutex_lock(&LOCK_commit_ordered);
binlog_status_var_num_commits= this->num_commits;
binlog_status_var_num_group_commits= this->num_group_commits;
if (!have_snapshot)
@@ -7112,34 +7702,18 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
set_binlog_snapshot_file(last_commit_pos_file);
binlog_snapshot_position= last_commit_pos_offset;
}
- pthread_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
if (have_snapshot)
{
- set_binlog_snapshot_file(trx_data->last_commit_pos_file);
- binlog_snapshot_position= trx_data->last_commit_pos_offset;
+ set_binlog_snapshot_file(cache_mngr->last_commit_pos_file);
+ binlog_snapshot_position= cache_mngr->last_commit_pos_offset;
}
}
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
-mysql_declare_plugin(binlog)
-{
- MYSQL_STORAGE_ENGINE_PLUGIN,
- &binlog_storage_engine,
- "binlog",
- "MySQL AB",
- "This is a pseudo storage engine to represent the binlog in a transaction",
- PLUGIN_LICENSE_GPL,
- binlog_init, /* Plugin Init */
- NULL, /* Plugin Deinit */
- 0x0100 /* 1.0 */,
- binlog_status_vars_top, /* status variables */
- binlog_sys_vars, /* system variables */
- NULL /* config options */
-}
-mysql_declare_plugin_end;
maria_declare_plugin(binlog)
{
MYSQL_STORAGE_ENGINE_PLUGIN,