summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc498
1 files changed, 341 insertions, 157 deletions
diff --git a/sql/log.cc b/sql/log.cc
index d119b88c4e0..480e3b696cc 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -55,10 +55,14 @@
#include "sql_show.h"
#include "my_pthread.h"
#include "semisync_master.h"
-#include "wsrep_mysqld.h"
#include "sp_rcontext.h"
#include "sp_head.h"
+#include "wsrep_mysqld.h"
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h"
+#endif /* WITH_WSREP */
+
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
#define MAX_TIME_SIZE 32
@@ -869,10 +873,10 @@ bool Log_to_csv_event_handler::
Open_tables_backup open_tables_backup;
CHARSET_INFO *client_cs= thd->variables.character_set_client;
bool save_time_zone_used;
- long query_time= (long) MY_MIN(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
- long lock_time= (long) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
- long query_time_micro= (long) (query_utime % 1000000);
- long lock_time_micro= (long) (lock_utime % 1000000);
+ ulong query_time= (ulong) MY_MIN(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ ulong query_time_micro= (ulong) (query_utime % 1000000);
+ ulong lock_time_micro= (ulong) (lock_utime % 1000000);
DBUG_ENTER("Log_to_csv_event_handler::log_slow");
@@ -1165,6 +1169,10 @@ bool LOGGER::error_log_print(enum loglevel level, const char *format,
{
bool error= FALSE;
Log_event_handler **current_handler;
+ THD *thd= current_thd;
+
+ if (likely(thd))
+ thd->error_printed_to_log= 1;
/* currently we don't need locking here as there is no error_log table */
for (current_handler= error_log_handler_list ; *current_handler ;)
@@ -1702,7 +1710,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
#ifdef WITH_WSREP
if (cache_mngr && !cache_mngr->trx_cache.empty()) {
- IO_CACHE* cache= get_trans_log(thd);
+ IO_CACHE* cache= cache_mngr->get_binlog_cache_log(true);
uchar *buf;
size_t len=0;
wsrep_write_cache_buf(cache, &buf, &len);
@@ -2186,18 +2194,30 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional)
{
if (is_transactional)
{
- my_message(ER_TRANS_CACHE_FULL, ER_THD(thd, ER_TRANS_CACHE_FULL), MYF(MY_WME));
+ my_message(ER_TRANS_CACHE_FULL, ER_THD(thd, ER_TRANS_CACHE_FULL), MYF(0));
}
else
{
- my_message(ER_STMT_CACHE_FULL, ER_THD(thd, ER_STMT_CACHE_FULL), MYF(MY_WME));
+ my_message(ER_STMT_CACHE_FULL, ER_THD(thd, ER_STMT_CACHE_FULL), MYF(0));
}
}
else
{
- my_error(ER_ERROR_ON_WRITE, MYF(MY_WME), name, errno);
+ my_error(ER_ERROR_ON_WRITE, MYF(0), name, errno);
}
-
+#ifdef WITH_WSREP
+ /* If wsrep transaction is active and binlog emulation is on,
+ binlog write error may leave transaction without any registered
+ htons. This makes wsrep rollback hooks to be skipped and the
+ transaction will remain alive in wsrep world after rollback.
+ Register binlog hton here to ensure that rollback happens in full. */
+ if (WSREP_EMULATE_BINLOG(thd))
+ {
+ if (is_transactional)
+ trans_register_ha(thd, TRUE, binlog_hton);
+ trans_register_ha(thd, FALSE, binlog_hton);
+ }
+#endif /* WITH_WSREP */
DBUG_VOID_RETURN;
}
@@ -2290,8 +2310,17 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
+#ifdef WITH_WSREP
+ /* for streaming replication, we must replicate savepoint rollback so that
+ slaves can maintain SR transactions
+ */
+ if (unlikely(thd->wsrep_trx().is_streaming() ||
+ (trans_has_updated_non_trans_table(thd)) ||
+ (thd->variables.option_bits & OPTION_KEEP_LOG)))
+#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
(thd->variables.option_bits & OPTION_KEEP_LOG)))
+#endif /* WITH_WSREP */
{
char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin);
@@ -2417,7 +2446,7 @@ static void setup_windows_event_source()
// Create the event source registry key
dwError= RegCreateKey(HKEY_LOCAL_MACHINE,
- "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\MySQL",
+ "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\MariaDB",
&hRegKey);
/* Name of the PE module that contains the message resource */
@@ -2449,11 +2478,19 @@ static void setup_windows_event_source()
exceeds FN_REFLEN; (ii) if the number of extensions is exhausted;
or (iii) some other error happened while examining the filesystem.
+ @param name Base name of file
+ @param min_log_number_to_use minimum log number to choose. Set by
+ CHANGE MASTER .. TO
+ @param last_used_log_number If 0, find log number based on files.
+ If not 0, then use *last_used_log_number +1
+ Will be update to new generated number
@return
+ 0 ok
nonzero if not possible to get unique filename.
*/
-static int find_uniq_filename(char *name, ulong next_log_number)
+static int find_uniq_filename(char *name, ulong min_log_number_to_use,
+ ulong *last_used_log_number)
{
uint i;
char buff[FN_REFLEN], ext_buf[FN_REFLEN];
@@ -2472,24 +2509,34 @@ static int find_uniq_filename(char *name, ulong next_log_number)
*end='.';
length= (size_t) (end - start + 1);
- if ((DBUG_EVALUATE_IF("error_unique_log_filename", 1,
- unlikely(!(dir_info= my_dir(buff,
- MYF(MY_DONT_SORT)))))))
- { // This shouldn't happen
- strmov(end,".1"); // use name+1
- DBUG_RETURN(1);
- }
- file_info= dir_info->dir_entry;
- max_found= next_log_number ? next_log_number-1 : 0;
- for (i= dir_info->number_of_files ; i-- ; file_info++)
+ /* The following matches the code for my_dir () below */
+ DBUG_EXECUTE_IF("error_unique_log_filename",
+ {
+ strmov(end,".1");
+ DBUG_RETURN(1);
+ });
+
+ if (*last_used_log_number)
+ max_found= *last_used_log_number;
+ else
{
- if (strncmp(file_info->name, start, length) == 0 &&
- test_if_number(file_info->name+length, &number,0))
+ if (unlikely(!(dir_info= my_dir(buff, MYF(MY_DONT_SORT)))))
+ { // This shouldn't happen
+ strmov(end,".1"); // use name+1
+ DBUG_RETURN(1);
+ }
+ file_info= dir_info->dir_entry;
+ max_found= min_log_number_to_use ? min_log_number_to_use-1 : 0;
+ for (i= dir_info->number_of_files ; i-- ; file_info++)
{
- set_if_bigger(max_found, number);
+ if (strncmp(file_info->name, start, length) == 0 &&
+ test_if_number(file_info->name+length, &number,0))
+ {
+ set_if_bigger(max_found, number);
+ }
}
+ my_dirend(dir_info);
}
- my_dirend(dir_info);
/* check if reached the maximum possible extension number */
if (max_found >= MAX_LOG_UNIQUE_FN_EXT)
@@ -2528,6 +2575,7 @@ index files.", name, ext_buf, (strlen(ext_buf) + (end - name)));
error= 1;
goto end;
}
+ *last_used_log_number= next;
/* print warning if reaching the end of available extensions. */
if ((next > (MAX_LOG_UNIQUE_FN_EXT - LOG_WARN_UNIQUE_FN_EXT_LEFT)))
@@ -2646,7 +2694,7 @@ bool MYSQL_LOG::open(
#endif
if ((file= mysql_file_open(log_file_key, log_file_name, open_flags,
- MYF(MY_WME | ME_WAITTANG))) < 0)
+ MYF(MY_WME))) < 0)
goto err;
if (is_fifo)
@@ -2779,19 +2827,24 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name,
ulong next_log_number)
{
fn_format(new_name, log_name, mysql_data_home, "", 4);
- if (log_type == LOG_BIN)
+ return 0;
+}
+
+int MYSQL_BIN_LOG::generate_new_name(char *new_name, const char *log_name,
+ ulong next_log_number)
+{
+ fn_format(new_name, log_name, mysql_data_home, "", 4);
+ if (!fn_ext(log_name)[0])
{
- if (!fn_ext(log_name)[0])
+ if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) ||
+ unlikely(find_uniq_filename(new_name, next_log_number,
+ &last_used_log_number)))
{
- if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) ||
- unlikely(find_uniq_filename(new_name, next_log_number)))
- {
- THD *thd= current_thd;
- if (unlikely(thd))
- my_error(ER_NO_UNIQUE_LOGFILE, MYF(ME_FATALERROR), log_name);
- sql_print_error(ER_DEFAULT(ER_NO_UNIQUE_LOGFILE), log_name);
- return 1;
- }
+ THD *thd= current_thd;
+ if (unlikely(thd))
+ my_error(ER_NO_UNIQUE_LOGFILE, MYF(ME_FATAL), log_name);
+ sql_print_error(ER_DEFAULT(ER_NO_UNIQUE_LOGFILE), log_name);
+ return 1;
}
}
return 0;
@@ -3191,7 +3244,8 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:reset_master_pending(0), mark_xid_done_waiting(0),
- bytes_written(0), file_id(1), open_count(1),
+ bytes_written(0), last_used_log_number(0),
+ file_id(1), open_count(1),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
group_commit_trigger_count(0), group_commit_trigger_timeout(0),
@@ -4167,6 +4221,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log,
name=0; // Protect against free
close(LOG_CLOSE_TO_BE_OPENED);
+ last_used_log_number= 0; // Reset log number cache
+
/*
First delete all old log files and then update the index file.
As we first delete the log files and do not use sort of logging,
@@ -4620,7 +4676,7 @@ int MYSQL_BIN_LOG::open_purge_index_file(bool destroy)
if (!my_b_inited(&purge_index_file))
{
if ((file= my_open(purge_index_file_name, O_RDWR | O_CREAT | O_BINARY,
- MYF(MY_WME | ME_WAITTANG))) < 0 ||
+ MYF(MY_WME))) < 0 ||
init_io_cache(&purge_index_file, file, IO_SIZE,
(destroy ? WRITE_CACHE : READ_CACHE),
0, 0, MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
@@ -5102,7 +5158,11 @@ bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg)
int MYSQL_BIN_LOG::new_file()
{
- return new_file_impl(1);
+ int res;
+ mysql_mutex_lock(&LOCK_log);
+ res= new_file_impl();
+ mysql_mutex_unlock(&LOCK_log);
+ return res;
}
/*
@@ -5111,7 +5171,7 @@ int MYSQL_BIN_LOG::new_file()
*/
int MYSQL_BIN_LOG::new_file_without_locking()
{
- return new_file_impl(0);
+ return new_file_impl();
}
@@ -5127,7 +5187,7 @@ int MYSQL_BIN_LOG::new_file_without_locking()
The new file name is stored last in the index file
*/
-int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
+int MYSQL_BIN_LOG::new_file_impl()
{
int error= 0, close_on_error= FALSE;
char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open;
@@ -5136,14 +5196,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
File UNINIT_VAR(old_file);
DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl");
- if (need_lock)
- mysql_mutex_lock(&LOCK_log);
+ DBUG_ASSERT(log_type == LOG_BIN);
mysql_mutex_assert_owner(&LOCK_log);
if (!is_open())
{
DBUG_PRINT("info",("log is closed"));
- mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -5162,7 +5220,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
#ifdef ENABLE_AND_FIX_HANG
close_on_error= TRUE;
#endif
- goto end;
+ goto end2;
}
new_name_ptr=new_name;
@@ -5189,7 +5247,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
close_on_error= TRUE;
my_printf_error(ER_ERROR_ON_WRITE,
ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE),
- MYF(ME_FATALERROR), name, errno);
+ MYF(ME_FATAL), name, errno);
goto end;
}
bytes_written += r.data_written;
@@ -5258,14 +5316,21 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
/* handle reopening errors */
if (unlikely(error))
{
- my_error(ER_CANT_OPEN_FILE, MYF(ME_FATALERROR), file_to_open, error);
+ my_error(ER_CANT_OPEN_FILE, MYF(ME_FATAL), file_to_open, error);
close_on_error= TRUE;
}
my_free(old_name);
end:
+ /* In case of errors, reuse the last generated log file name */
+ if (unlikely(error))
+ {
+ DBUG_ASSERT(last_used_log_number > 0);
+ last_used_log_number--;
+ }
+end2:
if (delay_close)
{
clear_inuse_flag_when_closing(old_file);
@@ -5291,8 +5356,6 @@ end:
}
mysql_mutex_unlock(&LOCK_index);
- if (need_lock)
- mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -5652,7 +5715,18 @@ THD::binlog_start_trans_and_stmt()
this->binlog_set_stmt_begin();
bool mstmt_mode= in_multi_stmt_transaction_mode();
#ifdef WITH_WSREP
- /* Write Gtid
+ /*
+ With wsrep binlog emulation we can skip the rest because the
+ binlog cache will not be written into binlog. Note however that
+ because of this the hton callbacks will not get called to clean
+ up the cache, so this must be done explicitly when the transaction
+ terminates.
+ */
+ if (WSREP_EMULATE_BINLOG_NNULL(this))
+ {
+ DBUG_VOID_RETURN;
+ }
+ /* Write Gtid
Get domain id only when gtid mode is set
If this event is replicate through a master then ,
we will forward the same gtid another nodes
@@ -5757,10 +5831,10 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
/* Ensure that all events in a GTID group are in the same cache */
if (variables.option_bits & OPTION_GTID_BEGIN)
is_transactional= 1;
-
+
/* Pre-conditions */
DBUG_ASSERT(is_current_stmt_binlog_format_row());
- DBUG_ASSERT(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open());
+ DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
@@ -5958,7 +6032,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
DBUG_PRINT("enter", ("standalone: %d", standalone));
#ifdef WITH_WSREP
- if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode && !thd->variables.gtid_seq_no)
+ if (WSREP(thd) &&
+ (wsrep_thd_trx_seqno(thd) > 0) &&
+ wsrep_gtid_mode && !thd->variables.gtid_seq_no)
{
domain_id= wsrep_gtid_domain_id;
} else {
@@ -6061,7 +6137,7 @@ MYSQL_BIN_LOG::write_state_to_file()
goto end;
err:
- sql_print_error("Error writing binlog state to file '%s'.\n", buf);
+ sql_print_error("Error writing binlog state to file '%s'.", buf);
if (log_inited)
end_io_cache(&cache);
end:
@@ -6121,7 +6197,7 @@ MYSQL_BIN_LOG::read_state_from_file()
goto end;
err:
- sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf);
+ sql_print_error("Error reading binlog GTID state from file '%s'.", buf);
end:
if (log_inited)
end_io_cache(&cache);
@@ -6275,7 +6351,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
*/
/* applier and replayer can skip writing binlog events */
if ((WSREP_EMULATE_BINLOG(thd) &&
- IF_WSREP(thd->wsrep_exec_mode != REPL_RECV, 0)) || is_open())
+ IF_WSREP(thd->wsrep_cs().mode() == wsrep::client_state::m_local, 0)) || is_open())
{
my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
@@ -6306,11 +6382,25 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (direct)
{
+ /* We come here only for incident events */
int res;
uint64 commit_id= 0;
+ MDL_request mdl_request;
DBUG_PRINT("info", ("direct is set"));
+ DBUG_ASSERT(!thd->backup_commit_lock);
+
+ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
+ thd->mdl_context.acquire_lock(&mdl_request,
+ thd->variables.lock_wait_timeout);
+ thd->backup_commit_lock= &mdl_request;
+
if ((res= thd->wait_for_prior_commit()))
+ {
+ if (mdl_request.ticket)
+ thd->mdl_context.release_lock(mdl_request.ticket);
+ thd->backup_commit_lock= 0;
DBUG_RETURN(res);
+ }
file= &log_file;
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
@@ -6325,7 +6415,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
commit_name.length);
commit_id= entry->val_int(&null_value);
});
- if (write_gtid_event(thd, true, using_trans, commit_id))
+ res= write_gtid_event(thd, true, using_trans, commit_id);
+ if (mdl_request.ticket)
+ thd->mdl_context.release_lock(mdl_request.ticket);
+ thd->backup_commit_lock= 0;
+ if (res)
goto err;
}
else
@@ -6466,25 +6560,8 @@ err:
it's list before dump-thread tries to send it
*/
update_binlog_end_pos(offset);
- /*
- If a transaction with the LOAD DATA statement is divided
- into logical mini-transactions (of the 10K rows) and binlog
- is rotated, then the last portion of data may be lost due to
- wsrep handler re-registration at the boundary of the split.
- Since splitting of the LOAD DATA into mini-transactions is
- logical, we should not allow these mini-transactions to fall
- into separate binlogs. Therefore, it is necessary to prohibit
- the rotation of binlog in the middle of processing LOAD DATA:
- */
-#ifdef WITH_WSREP
- if (!thd->wsrep_split_flag)
- {
-#endif /* WITH_WSREP */
if (unlikely((error= rotate(false, &check_purge))))
check_purge= false;
-#ifdef WITH_WSREP
- }
-#endif /* WITH_WSREP */
}
}
}
@@ -7211,25 +7288,8 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
likely(!(error= flush_and_sync(0))))
{
update_binlog_end_pos();
- /*
- If a transaction with the LOAD DATA statement is divided
- into logical mini-transactions (of the 10K rows) and binlog
- is rotated, then the last portion of data may be lost due to
- wsrep handler re-registration at the boundary of the split.
- Since splitting of the LOAD DATA into mini-transactions is
- logical, we should not allow these mini-transactions to fall
- into separate binlogs. Therefore, it is necessary to prohibit
- the rotation of binlog in the middle of processing LOAD DATA:
- */
-#ifdef WITH_WSREP
- if (!thd->wsrep_split_flag)
- {
-#endif /* WITH_WSREP */
if (unlikely((error= rotate(false, &check_purge))))
check_purge= false;
-#ifdef WITH_WSREP
- }
-#endif /* WITH_WSREP */
}
offset= my_b_tell(&log_file);
@@ -7280,7 +7340,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg
ability to do crash recovery - crash recovery will just have to scan a
bit more of the binlog than strictly necessary.
*/
- sql_print_error("Failed to write binlog checkpoint event to binary log\n");
+ sql_print_error("Failed to write binlog checkpoint event to binary log");
}
offset= my_b_tell(&log_file);
@@ -7421,7 +7481,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
group_commit_entry *entry, *orig_queue, *last;
wait_for_commit *cur;
wait_for_commit *wfc;
+ bool backup_lock_released= 0;
+ int result= 0;
+ THD *thd= orig_entry->thd;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
+ DBUG_ASSERT(thd == current_thd);
/*
Check if we need to wait for another transaction to commit before us.
@@ -7433,8 +7497,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
wfc= orig_entry->thd->wait_for_commit_ptr;
orig_entry->queued_by_other= false;
- if (wfc && wfc->waitee)
+ if (wfc && wfc->waitee.load(std::memory_order_acquire))
{
+ wait_for_commit *loc_waitee;
+
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/*
Do an extra check here, this time safely under lock.
@@ -7446,10 +7512,25 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
before setting the flag, so there is no risk that we can queue ahead of
it.
*/
- if (wfc->waitee && !wfc->waitee->commit_started)
+ if ((loc_waitee= wfc->waitee.load(std::memory_order_relaxed)) &&
+ !loc_waitee->commit_started)
{
PSI_stage_info old_stage;
- wait_for_commit *loc_waitee;
+
+ /*
+ Release MDL_BACKUP_COMMIT LOCK while waiting for other threads to
+ commit.
+ This is needed to avoid deadlock between the other threads (which not
+ yet have the MDL_BACKUP_COMMIT_LOCK) and any threads using
+ BACKUP LOCK BLOCK_COMMIT.
+ */
+ if (thd->backup_commit_lock && thd->backup_commit_lock->ticket &&
+ !backup_lock_released)
+ {
+ backup_lock_released= 1;
+ thd->mdl_context.release_lock(thd->backup_commit_lock->ticket);
+ thd->backup_commit_lock->ticket= 0;
+ }
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
@@ -7471,7 +7552,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
&wfc->LOCK_wait_commit,
&stage_waiting_for_prior_transaction_to_commit,
&old_stage);
- while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed(1))
+ while ((loc_waitee= wfc->waitee.load(std::memory_order_relaxed)) &&
+ !orig_entry->thd->check_killed(1))
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
@@ -7489,14 +7571,18 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waitee);
+ } while (wfc->waitee.load(std::memory_order_relaxed));
}
else
{
/* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
- wfc->waitee= NULL;
+ /*
+ This is the thread clearing its own status, it is no longer on
+ the list of waiters. So no memory barriers are needed here.
+ */
+ wfc->waitee.store(NULL, std::memory_order_relaxed);
orig_entry->thd->EXIT_COND(&old_stage);
/* Interrupted by kill. */
@@ -7506,7 +7592,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wfc->wakeup_error,
ER_THD(orig_entry->thd, wfc->wakeup_error), MYF(0));
- DBUG_RETURN(-1);
+ result= -1;
+ goto end;
}
}
orig_entry->thd->EXIT_COND(&old_stage);
@@ -7520,12 +7607,13 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
then there is nothing else to do.
*/
if (orig_entry->queued_by_other)
- DBUG_RETURN(0);
+ goto end;
if (wfc && wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- DBUG_RETURN(-1);
+ result= -1;
+ goto end;
}
/* Now enqueue ourselves in the group commit queue. */
@@ -7686,14 +7774,39 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_PRINT("info", ("Queued for group commit as %s",
(orig_queue == NULL) ? "leader" : "participant"));
- DBUG_RETURN(orig_queue == NULL);
+ result= orig_queue == NULL;
+
+end:
+ if (backup_lock_released)
+ thd->mdl_context.acquire_lock(thd->backup_commit_lock,
+ thd->variables.lock_wait_timeout);
+ DBUG_RETURN(result);
}
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
int is_leader= queue_for_group_commit(entry);
-
+#ifdef WITH_WSREP
+ if (wsrep_is_active(entry->thd) &&
+ wsrep_run_commit_hook(entry->thd, entry->all))
+ {
+ /*
+ Release commit order and if leader, wait for prior commit to
+ complete. This establishes total order for group leaders.
+ */
+ if (wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error()))
+ {
+ entry->thd->wakeup_subsequent_commits(1);
+ return 1;
+ }
+ if (is_leader)
+ {
+ if (entry->thd->wait_for_prior_commit())
+ return 1;
+ }
+ }
+#endif /* WITH_WSREP */
/*
The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done.
@@ -7775,10 +7888,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
switch (entry->error)
{
case ER_ERROR_ON_WRITE:
- my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno);
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG), name, entry->commit_errno);
break;
case ER_ERROR_ON_READ:
- my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+ my_error(ER_ERROR_ON_READ, MYF(ME_ERROR_LOG),
entry->error_cache->file_name, entry->commit_errno);
break;
default:
@@ -7789,7 +7902,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
*/
my_printf_error(entry->error,
"Error writing transaction to binary log: %d",
- MYF(ME_NOREFRESH), entry->error);
+ MYF(ME_ERROR_LOG), entry->error);
}
/*
@@ -7998,20 +8111,6 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mark_xids_active(binlog_id, xid_count);
}
- /*
- If a transaction with the LOAD DATA statement is divided
- into logical mini-transactions (of the 10K rows) and binlog
- is rotated, then the last portion of data may be lost due to
- wsrep handler re-registration at the boundary of the split.
- Since splitting of the LOAD DATA into mini-transactions is
- logical, we should not allow these mini-transactions to fall
- into separate binlogs. Therefore, it is necessary to prohibit
- the rotation of binlog in the middle of processing LOAD DATA:
- */
-#ifdef WITH_WSREP
- if (!leader->thd->wsrep_split_flag)
- {
-#endif /* WITH_WSREP */
if (rotate(false, &check_purge))
{
/*
@@ -8028,12 +8127,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
when the transaction has been safely committed in the engine.
*/
leader->cache_mngr->delayed_error= true;
- my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno);
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_ERROR_LOG), name, errno);
check_purge= false;
}
-#ifdef WITH_WSREP
- }
-#endif /* WITH_WSREP */
/* In case of binlog rotate, update the correct current binlog offset. */
commit_offset= my_b_write_tell(&log_file);
}
@@ -8682,18 +8778,35 @@ bool flush_error_log()
}
#ifdef _WIN32
+struct eventlog_source
+{
+ HANDLE handle;
+ eventlog_source()
+ {
+ setup_windows_event_source();
+ handle = RegisterEventSource(NULL, "MariaDB");
+ }
+
+ ~eventlog_source()
+ {
+ if (handle)
+ DeregisterEventSource(handle);
+ }
+};
+
+static eventlog_source eventlog;
+
static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
size_t length, size_t buffLen)
{
- HANDLE event;
+ HANDLE event= eventlog.handle;
char *buffptr= buff;
DBUG_ENTER("print_buffer_to_nt_eventlog");
/* Add ending CR/LF's to string, overwrite last chars if necessary */
strmov(buffptr+MY_MIN(length, buffLen-5), "\r\n\r\n");
- setup_windows_event_source();
- if ((event= RegisterEventSource(NULL,"MySQL")))
+ if (event)
{
switch (level) {
case ERROR_LEVEL:
@@ -8709,7 +8822,6 @@ static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
0, (LPCSTR*) &buffptr, NULL);
break;
}
- DeregisterEventSource(event);
}
DBUG_VOID_RETURN;
@@ -10510,7 +10622,7 @@ set_binlog_snapshot_file(const char *src)
Copy out current values of status variables, for SHOW STATUS or
information_schema.global_status.
- This is called only under LOCK_show_status, so we can fill in a static array.
+ This is called only under LOCK_all_status_vars, so we can fill in a static array.
*/
void
TC_LOG_BINLOG::set_status_variables(THD *thd)
@@ -10635,7 +10747,9 @@ maria_declare_plugin(binlog)
maria_declare_plugin_end;
#ifdef WITH_WSREP
-IO_CACHE * get_trans_log(THD * thd)
+#include "wsrep_mysqld.h"
+
+IO_CACHE *wsrep_get_trans_cache(THD * thd)
{
DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF);
binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
@@ -10648,45 +10762,115 @@ IO_CACHE * get_trans_log(THD * thd)
return NULL;
}
-
-bool wsrep_trans_cache_is_empty(THD *thd)
-{
- binlog_cache_mngr *const cache_mngr=
- (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- return (!cache_mngr || cache_mngr->trx_cache.empty());
-}
-
-
-void thd_binlog_trx_reset(THD * thd)
+void wsrep_thd_binlog_trx_reset(THD * thd)
{
+ DBUG_ENTER("wsrep_thd_binlog_trx_reset");
+ WSREP_DEBUG("wsrep_thd_binlog_reset");
/*
todo: fix autocommit select to not call the caller
*/
- if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
{
- binlog_cache_mngr *const cache_mngr=
- (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- if (cache_mngr)
+ cache_mngr->reset(false, true);
+ if (!cache_mngr->stmt_cache.empty())
{
- cache_mngr->reset(false, true);
- if (!cache_mngr->stmt_cache.empty())
- {
- WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query());
- cache_mngr->stmt_cache.reset();
- }
+ WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query());
+ cache_mngr->stmt_cache.reset();
}
}
thd->clear_binlog_table_maps();
+ DBUG_VOID_RETURN;
}
-
-void thd_binlog_rollback_stmt(THD * thd)
+void wsrep_thd_binlog_stmt_rollback(THD * thd)
{
- WSREP_DEBUG("thd_binlog_rollback_stmt connection: %llu",
- thd->thread_id);
+ DBUG_ENTER("wsrep_thd_binlog_stmt_rollback");
+ WSREP_DEBUG("wsrep_thd_binlog_stmt_rollback");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (cache_mngr)
- cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+ {
+ thd->binlog_remove_pending_rows_event(TRUE, TRUE);
+ cache_mngr->stmt_cache.reset();
+ }
+ DBUG_VOID_RETURN;
+}
+
+bool wsrep_stmt_rollback_is_safe(THD* thd)
+{
+ bool ret(true);
+
+ DBUG_ENTER("wsrep_binlog_stmt_rollback_is_safe");
+
+ binlog_cache_mngr *cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+
+ if (binlog_hton && cache_mngr)
+ {
+ binlog_cache_data * trx_cache = &cache_mngr->trx_cache;
+ if (thd->wsrep_sr().fragments_certified() > 0 &&
+ (trx_cache->get_prev_position() == MY_OFF_T_UNDEF ||
+ trx_cache->get_prev_position() < thd->wsrep_sr().log_position()))
+ {
+ WSREP_DEBUG("statement rollback is not safe for streaming replication"
+ " pre-stmt_pos: %llu, frag repl pos: %zu\n"
+ "Thread: %llu, SQL: %s",
+ trx_cache->get_prev_position(),
+ thd->wsrep_sr().log_position(),
+ thd->thread_id, thd->query());
+ ret = false;
+ }
+ }
+ DBUG_RETURN(ret);
}
+
+void wsrep_register_binlog_handler(THD *thd, bool trx)
+{
+ DBUG_ENTER("register_binlog_handler");
+ /*
+ If this is the first call to this function while processing a statement,
+ the transactional cache does not have a savepoint defined. So, in what
+ follows:
+ . an implicit savepoint is defined;
+ . callbacks are registered;
+ . binary log is set as read/write.
+
+ The savepoint allows for truncating the trx-cache transactional changes
+ fail. Callbacks are necessary to flush caches upon committing or rolling
+ back a statement or a transaction. However, notifications do not happen
+ if the binary log is set as read/write.
+ */
+ binlog_cache_mngr *cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ /* cache_mngr may be missing e.g. in mtr test ev51914.test */
+ if (cache_mngr)
+ {
+ /*
+ Set an implicit savepoint in order to be able to truncate a trx-cache.
+ */
+ if (cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF)
+ {
+ my_off_t pos= 0;
+ binlog_trans_log_savepos(thd, &pos);
+ cache_mngr->trx_cache.set_prev_position(pos);
+ }
+
+ /*
+ Set callbacks in order to be able to call commmit or rollback.
+ */
+ if (trx)
+ trans_register_ha(thd, TRUE, binlog_hton);
+ trans_register_ha(thd, FALSE, binlog_hton);
+
+ /*
+ Set the binary log as read/write otherwise callbacks are not called.
+ */
+ thd->ha_data[binlog_hton->slot].ha_info[0].set_trx_read_write();
+ }
+ DBUG_VOID_RETURN;
+}
+
#endif /* WITH_WSREP */