diff options
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 165 |
1 files changed, 162 insertions, 3 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index d5cd6a3efed..6d53e6c3187 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -31,6 +31,13 @@ static int count_relay_log_space(Relay_log_info* rli); +/** + Current replication state (hash of last GTID executed, per replication + domain). +*/ +rpl_slave_state rpl_global_gtid_slave_state; + + // Defined in slave.cc int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, @@ -51,7 +58,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) abort_pos_wait(0), slave_run_id(0), sql_thd(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - tables_to_lock(0), tables_to_lock_count(0), + gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), last_event_start_time(0), deferred_events(NULL),m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), m_annotate_event(0) @@ -1091,7 +1098,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) if (until_condition == UNTIL_MASTER_POS) { - if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id) + if (ev && ev->server_id == (uint32) global_system_variables.server_id && + !replicate_same_server_id) DBUG_RETURN(FALSE); log_name= group_master_log_name; log_pos= (!ev)? group_master_log_pos : @@ -1189,7 +1197,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const void Relay_log_info::stmt_done(my_off_t event_master_log_pos, - time_t event_creation_time) + time_t event_creation_time, THD *thd) { #ifndef DBUG_OFF extern uint debug_not_change_ts_if_art_event; @@ -1224,7 +1232,23 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, else { inc_group_relay_log_pos(event_master_log_pos); + if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this)) + { + report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Failed to update GTID state in %s.%s, slave state may become " + "inconsistent: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + /* + At this point we are not in a transaction (for example after DDL), + so we can not roll back. Anyway, normally updates to the slave + state table should not fail, and if they do, at least we made the + DBA aware of the problem in the error log. + */ + } + DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); flush_relay_log_info(this); + DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); /* Note that Rotate_log_event::do_apply_event() does not call this function, so there is no chance that a fake rotate event resets @@ -1356,4 +1380,139 @@ void Relay_log_info::slave_close_thread_tables(THD *thd) clear_tables_to_lock(); DBUG_VOID_RETURN; } + + +int +rpl_load_gtid_slave_state(THD *thd) +{ + TABLE_LIST tlist; + TABLE *table; + bool table_opened= false; + bool table_scanned= false; + struct local_element { uint64 sub_id; rpl_gtid gtid; }; + struct local_element *entry; + HASH hash; + int err= 0; + uint32 i; + uint64 highest_seq_no= 0; + DBUG_ENTER("rpl_load_gtid_slave_state"); + + rpl_global_gtid_slave_state.lock(); + bool loaded= rpl_global_gtid_slave_state.loaded; + rpl_global_gtid_slave_state.unlock(); + if (loaded) + DBUG_RETURN(0); + + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_READ); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + bitmap_set_all(table->read_set); + if ((err= table->file->ha_rnd_init_with_error(1))) + goto end; + table_scanned= true; + for (;;) + { + uint32 domain_id, server_id; + uint64 sub_id, seq_no; + uchar *rec; + + if ((err= table->file->ha_rnd_next(table->record[0]))) + { + if (err == HA_ERR_RECORD_DELETED) + continue; + else if (err == HA_ERR_END_OF_FILE) + break; + else + goto end; + } + domain_id= (ulonglong)table->field[0]->val_int(); + sub_id= (ulonglong)table->field[1]->val_int(); + server_id= (ulonglong)table->field[2]->val_int(); + seq_no= (ulonglong)table->field[3]->val_int(); + DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n", + (unsigned)domain_id, (unsigned)server_id, + (ulong)seq_no, (ulong)sub_id)); + if (seq_no > highest_seq_no) + highest_seq_no= seq_no; + + if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) + { + entry= (struct local_element *)rec; + if (entry->sub_id >= sub_id) + continue; + entry->sub_id= sub_id; + DBUG_ASSERT(entry->gtid.domain_id == domain_id); + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + } + else + { + if (!(entry= (struct local_element *)my_malloc(sizeof(*entry), + MYF(MY_WME)))) + { + err= 1; + goto end; + } + entry->sub_id= sub_id; + entry->gtid.domain_id= domain_id; + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + if ((err= my_hash_insert(&hash, (uchar *)entry))) + { + my_free(entry); + goto end; + } + } + } + + rpl_global_gtid_slave_state.lock(); + for (i= 0; i < hash.records; ++i) + { + entry= (struct local_element *)my_hash_element(&hash, i); + if ((err= rpl_global_gtid_slave_state.update(entry->gtid.domain_id, + entry->gtid.server_id, + entry->sub_id, + entry->gtid.seq_no))) + { + rpl_global_gtid_slave_state.unlock(); + goto end; + } + } + rpl_global_gtid_slave_state.loaded= true; + rpl_global_gtid_slave_state.unlock(); + + err= 0; /* Clear HA_ERR_END_OF_FILE */ + +end: + if (table_scanned) + { + table->file->ha_index_or_rnd_end(); + ha_commit_trans(thd, FALSE); + ha_commit_trans(thd, TRUE); + } + if (table_opened) + { + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + } + my_hash_free(&hash); + mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no); + DBUG_RETURN(err); +} + #endif |