summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc110
1 files changed, 104 insertions, 6 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 36239cfa055..e632fe629f5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -2470,7 +2470,7 @@ static int find_uniq_filename(char *name, ulong next_log_number)
char buff[FN_REFLEN], ext_buf[FN_REFLEN];
struct st_my_dir *dir_info;
reg1 struct fileinfo *file_info;
- ulong max_found, next, number;
+ ulong max_found, next, UNINIT_VAR(number);
size_t buf_length, length;
char *start, *end;
int error= 0;
@@ -3630,6 +3630,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
new_xid_list_entry->binlog_name= name_mem;
new_xid_list_entry->binlog_name_len= len;
new_xid_list_entry->xid_count= 0;
+ new_xid_list_entry->notify_count= 0;
/*
Find the name for the Initial binlog checkpoint.
@@ -4990,7 +4991,55 @@ MYSQL_BIN_LOG::is_xidlist_idle_nolock()
return true;
}
+#ifdef WITH_WSREP
+inline bool
+is_gtid_cached_internal(IO_CACHE *file)
+{
+ uchar data[EVENT_TYPE_OFFSET+1];
+ bool result= false;
+ my_off_t write_pos= my_b_tell(file);
+ if (reinit_io_cache(file, READ_CACHE, 0, 0, 0))
+ return false;
+ /*
+ In the cache we have gtid event if , below condition is true,
+ */
+ my_b_read(file, data, sizeof(data));
+ uint event_type= (uchar)data[EVENT_TYPE_OFFSET];
+ if (event_type == GTID_LOG_EVENT)
+ result= true;
+ /*
+ Cleanup , Why because we have not read the full buffer
+ and this will cause next to next reinit_io_cache(called in write_cache)
+ to make cache empty.
+ */
+ file->read_pos= file->read_end;
+ if (reinit_io_cache(file, WRITE_CACHE, write_pos, 0, 0))
+ return false;
+ return result;
+}
+#endif
+#ifdef WITH_WSREP
+inline bool
+MYSQL_BIN_LOG::is_gtid_cached(THD *thd)
+{
+ binlog_cache_mngr *mngr= (binlog_cache_mngr *) thd_get_ha_data(
+ thd, binlog_hton);
+ if (!mngr)
+ return false;
+ binlog_cache_data *cache_trans= mngr->get_binlog_cache_data(
+ use_trans_cache(thd, true));
+ binlog_cache_data *cache_stmt= mngr->get_binlog_cache_data(
+ use_trans_cache(thd, false));
+ if (cache_trans && !cache_trans->empty() &&
+ is_gtid_cached_internal(&cache_trans->cache_log))
+ return true;
+ if (cache_stmt && !cache_stmt->empty() &&
+ is_gtid_cached_internal(&cache_stmt->cache_log))
+ return true;
+ return false;
+}
+#endif
/**
Create a new log file name.
@@ -5582,7 +5631,36 @@ THD::binlog_start_trans_and_stmt()
cache_mngr->trx_cache.get_prev_position() == MY_OFF_T_UNDEF)
{
this->binlog_set_stmt_begin();
- if (in_multi_stmt_transaction_mode())
+ bool mstmt_mode= in_multi_stmt_transaction_mode();
+#ifdef WITH_WSREP
+ /* Write Gtid
+ Get domain id only when gtid mode is set
+ If this event is replicate through a master then ,
+ we will forward the same gtid another nodes
+ We have to do this only one time in mysql transaction.
+ Since this function is called multiple times , We will check for
+ ha_info->is_started()
+ */
+ Ha_trx_info *ha_info;
+ ha_info= this->ha_data[binlog_hton->slot].ha_info + (mstmt_mode ? 1 : 0);
+
+ if (!ha_info->is_started() && wsrep_gtid_mode
+ && this->variables.gtid_seq_no)
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton);
+ binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(1);
+ IO_CACHE *file= &cache_data->cache_log;
+ Log_event_writer writer(file, cache_data);
+ Gtid_log_event gtid_event(this, this->variables.gtid_seq_no,
+ this->variables.gtid_domain_id,
+ true, LOG_EVENT_SUPPRESS_USE_F,
+ true, 0);
+ gtid_event.server_id= this->variables.server_id;
+ writer.write(&gtid_event);
+ }
+#endif
+ if (mstmt_mode)
trans_register_ha(this, TRUE, binlog_hton);
trans_register_ha(this, FALSE, binlog_hton);
/*
@@ -5862,7 +5940,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
DBUG_PRINT("enter", ("standalone: %d", standalone));
#ifdef WITH_WSREP
- if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode)
+ if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode && !thd->variables.gtid_seq_no)
{
domain_id= wsrep_gtid_domain_id;
} else {
@@ -5914,6 +5992,12 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
/* Write the event to the binary log. */
DBUG_ASSERT(this == &mysql_bin_log);
+
+#ifdef WITH_WSREP
+ if (wsrep_gtid_mode && is_gtid_cached(thd))
+ DBUG_RETURN(false);
+#endif
+
if (write_event(&gtid_event))
DBUG_RETURN(true);
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
@@ -9718,9 +9802,20 @@ void
TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie)
{
xid_count_per_binlog *entry= static_cast<xid_count_per_binlog *>(cookie);
+ bool found_entry= false;
mysql_mutex_lock(&LOCK_binlog_background_thread);
- entry->next_in_queue= binlog_background_thread_queue;
- binlog_background_thread_queue= entry;
+ /* count the same notification kind from different engines */
+ for (xid_count_per_binlog *link= binlog_background_thread_queue;
+ link && !found_entry; link= link->next_in_queue)
+ {
+ if ((found_entry= (entry == link)))
+ entry->notify_count++;
+ }
+ if (!found_entry)
+ {
+ entry->next_in_queue= binlog_background_thread_queue;
+ binlog_background_thread_queue= entry;
+ }
mysql_cond_signal(&COND_binlog_background_thread);
mysql_mutex_unlock(&LOCK_binlog_background_thread);
}
@@ -9815,13 +9910,16 @@ binlog_background_thread(void *arg __attribute__((unused)))
);
while (queue)
{
+ long count= queue->notify_count;
THD_STAGE_INFO(thd, stage_binlog_processing_checkpoint_notify);
DEBUG_SYNC(thd, "binlog_background_thread_before_mark_xid_done");
/* Set the thread start time */
thd->set_time();
/* Grab next pointer first, as mark_xid_done() may free the element. */
next= queue->next_in_queue;
- mysql_bin_log.mark_xid_done(queue->binlog_id, true);
+ queue->notify_count= 0;
+ for (long i= 0; i <= count; i++)
+ mysql_bin_log.mark_xid_done(queue->binlog_id, true);
queue= next;
DBUG_EXECUTE_IF("binlog_background_checkpoint_processed",