summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc165
1 files changed, 162 insertions, 3 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index d5cd6a3efed..6d53e6c3187 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -31,6 +31,13 @@
static int count_relay_log_space(Relay_log_info* rli);
+/**
+ Current replication state (hash of last GTID executed, per replication
+ domain).
+*/
+rpl_slave_state rpl_global_gtid_slave_state;
+
+
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -51,7 +58,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
- tables_to_lock(0), tables_to_lock_count(0),
+ gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0)
@@ -1091,7 +1098,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
if (until_condition == UNTIL_MASTER_POS)
{
- if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
+ if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
+ !replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
log_pos= (!ev)? group_master_log_pos :
@@ -1189,7 +1197,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
- time_t event_creation_time)
+ time_t event_creation_time, THD *thd)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
@@ -1224,7 +1232,23 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
else
{
inc_group_relay_log_pos(event_master_log_pos);
+ if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this))
+ {
+ report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Failed to update GTID state in %s.%s, slave state may become "
+ "inconsistent: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ /*
+ At this point we are not in a transaction (for example after DDL),
+ so we can not roll back. Anyway, normally updates to the slave
+ state table should not fail, and if they do, at least we made the
+ DBA aware of the problem in the error log.
+ */
+ }
+ DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
flush_relay_log_info(this);
+ DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
/*
Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets
@@ -1356,4 +1380,139 @@ void Relay_log_info::slave_close_thread_tables(THD *thd)
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
+
+
+int
+rpl_load_gtid_slave_state(THD *thd)
+{
+ TABLE_LIST tlist;
+ TABLE *table;
+ bool table_opened= false;
+ bool table_scanned= false;
+ struct local_element { uint64 sub_id; rpl_gtid gtid; };
+ struct local_element *entry;
+ HASH hash;
+ int err= 0;
+ uint32 i;
+ uint64 highest_seq_no= 0;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ rpl_global_gtid_slave_state.lock();
+ bool loaded= rpl_global_gtid_slave_state.loaded;
+ rpl_global_gtid_slave_state.unlock();
+ if (loaded)
+ DBUG_RETURN(0);
+
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+
+ bitmap_set_all(table->read_set);
+ if ((err= table->file->ha_rnd_init_with_error(1)))
+ goto end;
+ table_scanned= true;
+ for (;;)
+ {
+ uint32 domain_id, server_id;
+ uint64 sub_id, seq_no;
+ uchar *rec;
+
+ if ((err= table->file->ha_rnd_next(table->record[0])))
+ {
+ if (err == HA_ERR_RECORD_DELETED)
+ continue;
+ else if (err == HA_ERR_END_OF_FILE)
+ break;
+ else
+ goto end;
+ }
+ domain_id= (ulonglong)table->field[0]->val_int();
+ sub_id= (ulonglong)table->field[1]->val_int();
+ server_id= (ulonglong)table->field[2]->val_int();
+ seq_no= (ulonglong)table->field[3]->val_int();
+ DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
+ (unsigned)domain_id, (unsigned)server_id,
+ (ulong)seq_no, (ulong)sub_id));
+ if (seq_no > highest_seq_no)
+ highest_seq_no= seq_no;
+
+ if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ {
+ entry= (struct local_element *)rec;
+ if (entry->sub_id >= sub_id)
+ continue;
+ entry->sub_id= sub_id;
+ DBUG_ASSERT(entry->gtid.domain_id == domain_id);
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ }
+ else
+ {
+ if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
+ MYF(MY_WME))))
+ {
+ err= 1;
+ goto end;
+ }
+ entry->sub_id= sub_id;
+ entry->gtid.domain_id= domain_id;
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ if ((err= my_hash_insert(&hash, (uchar *)entry)))
+ {
+ my_free(entry);
+ goto end;
+ }
+ }
+ }
+
+ rpl_global_gtid_slave_state.lock();
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry= (struct local_element *)my_hash_element(&hash, i);
+ if ((err= rpl_global_gtid_slave_state.update(entry->gtid.domain_id,
+ entry->gtid.server_id,
+ entry->sub_id,
+ entry->gtid.seq_no)))
+ {
+ rpl_global_gtid_slave_state.unlock();
+ goto end;
+ }
+ }
+ rpl_global_gtid_slave_state.loaded= true;
+ rpl_global_gtid_slave_state.unlock();
+
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ {
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ my_hash_free(&hash);
+ mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no);
+ DBUG_RETURN(err);
+}
+
#endif