diff options
-rw-r--r-- | scripts/mysql_system_tables.sql | 6 | ||||
-rw-r--r-- | sql/log.cc | 9 | ||||
-rw-r--r-- | sql/log_event.cc | 196 | ||||
-rw-r--r-- | sql/log_event.h | 24 | ||||
-rw-r--r-- | sql/slave.cc | 36 | ||||
-rw-r--r-- | sql/sql_repl.cc | 329 |
6 files changed, 591 insertions, 9 deletions
diff --git a/scripts/mysql_system_tables.sql b/scripts/mysql_system_tables.sql index a55c5f60351..8f8d6356999 100644 --- a/scripts/mysql_system_tables.sql +++ b/scripts/mysql_system_tables.sql @@ -20,6 +20,12 @@ -- set sql_mode=''; + +-- We want this to be created with the default storage engine. +-- This way, if InnoDB is used we get crash safety, and if MyISAM is used +-- we avoid mixed-engine transactions. +CREATE TABLE IF NOT EXISTS rpl_slave_state (domain_id INT UNSIGNED NOT NULL, sub_id BIGINT UNSIGNED NOT NULL, server_id INT UNSIGNED NOT NULL, seq_no BIGINT UNSIGNED NOT NULL, PRIMARY KEY (domain_id, sub_id)) comment='Replication slave GTID state'; + set storage_engine=myisam; flush tables; diff --git a/sql/log.cc b/sql/log.cc index 84a113768d9..e82f25bdc50 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -3270,9 +3270,12 @@ bool MYSQL_BIN_LOG::open(const char *log_name, there had been an entry (domain_id, server_id, 0). */ - Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state); - if (gl_ev.write(&log_file)) - goto err; + if (rpl_global_gtid_binlog_state.count()) + { + Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state); + if (gl_ev.write(&log_file)) + goto err; + } /* Output a binlog checkpoint event at the start of the binlog file. */ diff --git a/sql/log_event.cc b/sql/log_event.cc index f124ee1eae7..7a32a812214 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6279,6 +6279,67 @@ rpl_slave_state::next_subid(uint32 domain_id) #endif +/* + Prepare the current slave state as a string, suitable for sending to the + master to request to receive binlog events starting from that GTID state. + + The state consists of the most recently applied GTID for each domain_id, + ie. the one with the highest sub_id within each domain_id. +*/ + +int +rpl_slave_state::tostring(String *dest) +{ + bool first= true; + uint32 i; + int err= 1; + + lock(); + + for (i= 0; i < hash.records; ++i) + { + uint64 best_sub_id; + rpl_gtid best_gtid; + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + + DBUG_ASSERT(l /* We should never have empty list in element. */); + if (!l) + goto err; + + best_gtid.domain_id= e->domain_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + best_sub_id= l->sub_id; + while ((l= l->next)) + { + if (l->sub_id > best_sub_id) + { + best_sub_id= l->sub_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + } + } + + if (first) + first= false; + else + dest->append("-",1); + dest->append_ulonglong(best_gtid.domain_id); + dest->append("-",1); + dest->append_ulonglong(best_gtid.server_id); + dest->append("-",1); + dest->append_ulonglong(best_gtid.seq_no); + } + + err= 0; + +err: + unlock(); + return err; +} + + rpl_binlog_state::rpl_binlog_state() { my_hash_init(&hash, &my_charset_bin, 32, @@ -6410,6 +6471,119 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src) } return 0; } + + +slave_connection_state::slave_connection_state() +{ + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); +} + + +slave_connection_state::~slave_connection_state() +{ + my_hash_free(&hash); +} + + +/* + Create a hash from the slave GTID state that is sent to master when slave + connects to start replication. + + The state is sent as <GTID>,<GTID>,...,<GTID>, for example: + + 0-2-112,1-4-1022 + + The state gives for each domain_id the GTID to start replication from for + the corresponding replication stream. So domain_id must be unique. + + Returns 0 if ok, non-zero if error due to malformed input. + + Note that input string is built by slave server, so it will not be incorrect + unless bug/corruption/malicious server. So we just need basic sanity check, + not fancy user-friendly error message. +*/ + +int +slave_connection_state::load(char *slave_request, size_t len) +{ + char *p, *q, *end; + uint64 v; + uint32 domain_id, server_id; + uint64 seq_no; + uchar *rec; + rpl_gtid *gtid; + int err= 0; + + my_hash_reset(&hash); + p= slave_request; + end= slave_request + len; + for (;;) + { + q= end; + v= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v > (uint32)0xffffffff || *q != '-') + return 1; + domain_id= (uint32)v; + p= q+1; + q= end; + v= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v > (uint32)0xffffffff || *q != '-') + return 1; + server_id= (uint32)v; + p= q+1; + q= end; + seq_no= (uint64)my_strtoll10(p, &q, &err); + if (err != 0) + return 1; + + if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) + return 1; + gtid= (rpl_gtid *)rec; + gtid->domain_id= domain_id; + gtid->server_id= server_id; + gtid->seq_no= seq_no; + if (my_hash_insert(&hash, rec)) + { + my_free(rec); + return 1; + } + if (q == end) + break; /* Finished. */ + if (*q != ',') + return 1; + p= q+1; + } + + return 0; +} + + +rpl_gtid * +slave_connection_state::find(uint32 domain_id) +{ + return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0); +} + + +void +slave_connection_state::remove(const rpl_gtid *in_gtid) +{ + bool err; + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); +#ifndef DBUG_OFF + rpl_gtid *slave_gtid= (rpl_gtid *)rec; + DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */); + DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id); + DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no); +#endif + + err= my_hash_delete(&hash, rec); + DBUG_ASSERT(!err); +} + + #endif /* MYSQL_SERVER */ @@ -6443,6 +6617,28 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, { } + +/* + Used to record GTID while sending binlog to slave, without having to + fully contruct every Gtid_log_event() needlessly. +*/ +bool +Gtid_log_event::peek(const char *event_start, size_t event_len, + uint32 *domain_id, uint32 *server_id, uint64 *seq_no, + uchar *flags2) +{ + const char *p; + if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return true; + *server_id= uint4korr(event_start + SERVER_ID_OFFSET); + p= event_start + LOG_EVENT_HEADER_LEN; + *seq_no= uint8korr(p); + p+= 8; + *domain_id= uint4korr(p); + return false; +} + + bool Gtid_log_event::write(IO_CACHE *file) { diff --git a/sql/log_event.h b/sql/log_event.h index f3b055060aa..3b0988faef4 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3008,6 +3008,7 @@ struct rpl_slave_state int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool in_transaction); uint64 next_subid(uint32 domain_id); + int tostring(String *dest); void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); } @@ -3044,6 +3045,26 @@ struct rpl_binlog_state int read_from_iocache(IO_CACHE *src); }; + +/* + Represent the GTID state that a slave connection to a master requests + the master to start sending binlog events from. +*/ +struct slave_connection_state +{ + /* Mapping from domain_id to the GTID requested for that domain. */ + HASH hash; + + slave_connection_state(); + ~slave_connection_state(); + + int load(char *slave_request, size_t len); + rpl_gtid *find(uint32 domain_id); + void remove(const rpl_gtid *gtid); + ulong count() const { return hash.records; } +}; + + /** @class Gtid_log_event @@ -3134,6 +3155,9 @@ public: bool write(IO_CACHE *file); static int make_compatible_event(String *packet, bool *need_dummy_event, ulong ev_offset, uint8 checksum_alg); + static bool peek(const char *event_start, size_t event_len, + uint32 *domain_id, uint32 *server_id, uint64 *seq_no, + uchar *flags2); #endif }; diff --git a/sql/slave.cc b/sql/slave.cc index 38bf6559b35..66913f3ccf0 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1784,6 +1784,42 @@ past_checksum: after_set_capability: #endif + /* Request dump start from slave replication GTID state. */ + + /* ToDo: This needs to be configurable somehow in a useful way ... */ + if (rpl_global_gtid_slave_state.count()) + { + int rc; + char str_buf[256]; + String connect_state(str_buf, sizeof(str_buf), system_charset_info); + connect_state.length(0); + + connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + rpl_global_gtid_slave_state.tostring(&connect_state); + connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_connect_state failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_connect_state."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } + err: if (errmsg) { diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 3c206a8857f..cd32dd7148a 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -507,6 +507,26 @@ get_mariadb_slave_capability(THD *thd) /* + Get the value of the @slave_connect_state user variable into the supplied + String (this is the GTID connect state requested by the connecting slave). + + Returns false if error (ie. slave did not set the variable and does not + want to use GTID to set start position), true if success. +*/ +static bool +get_slave_connect_state(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + +/* Function prepares and sends repliation heartbeat event. @param net net object of THD @@ -569,6 +589,216 @@ static int send_heartbeat_event(NET* net, String* packet, } +struct binlog_file_entry +{ + binlog_file_entry *next; + char *name; +}; + +static binlog_file_entry * +get_binlog_list(MEM_ROOT *memroot) +{ + IO_CACHE *index_file; + char fname[FN_REFLEN]; + size_t length; + binlog_file_entry *current_list= NULL, *e; + DBUG_ENTER("get_binlog_list"); + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + DBUG_RETURN(NULL); + } + + mysql_bin_log.lock_index(); + index_file=mysql_bin_log.get_index_file(); + reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + + /* The file ends with EOF or empty line */ + while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) + { + --length; /* Remove the newline */ + if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) || + !(e->name= strmake_root(memroot, fname, length))) + { + mysql_bin_log.unlock_index(); + my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e)); + DBUG_RETURN(NULL); + } + e->next= current_list; + current_list= e; + } + mysql_bin_log.unlock_index(); + + DBUG_RETURN(current_list); +} + +/* + Find the Gtid_list_log_event at the start of a binlog. + + NULL for ok, non-NULL error message for error. + + If ok, then the event is returned in *out_gtid_list. This can be NULL if we + get back to binlogs written by old server version without GTID support. If + so, it means we have reached the point to start from, as no GTID events can + exist in earlier binlogs. +*/ +static const char * +get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) +{ + Format_description_log_event init_fdle(BINLOG_VERSION); + Format_description_log_event *fdle; + Log_event *ev; + const char *errormsg = NULL; + + *out_gtid_list= NULL; + + if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, + opt_master_verify_checksum)) || + ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + return "Could not read format description log event while looking for " + "GTID position in binlog"; + + fdle= static_cast<Format_description_log_event *>(ev); + + for (;;) + { + Log_event_type typ; + + ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum); + if (!ev) + { + errormsg= "Could not read GTID list event while looking for GTID " + "position in binlog"; + break; + } + typ= ev->get_type_code(); + if (typ == GTID_LIST_EVENT) + break; /* Done, found it */ + if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == FORMAT_DESCRIPTION_EVENT) + continue; /* Continue looking */ + + /* We did not find any Gtid_list_log_event, must be old binlog. */ + ev= NULL; + break; + } + + delete fdle; + *out_gtid_list= static_cast<Gtid_list_log_event *>(ev); + return errormsg; +} + + +/* + Check if every GTID requested by the slave is contained in this (or a later) + binlog file. Return true if so, false if not. +*/ +static bool +contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) +{ + uint32 i; + + for (i= 0; i < glev->count; ++i) + { + const rpl_gtid *gtid= st->find(glev->list[i].domain_id); + if (gtid != NULL && + gtid->server_id == glev->list[i].server_id && + gtid->seq_no <= glev->list[i].seq_no) + { + /* + The slave needs to receive gtid, but it is contained in an earlier + binlog file. So we need to serch back further. + */ + return false; + } + } + return true; +} + + +/* + Find the name of the binlog file to start reading for a slave that connects + using GTID state. + + Returns the file name in out_name, which must be of size at least FN_REFLEN. + + Returns NULL on ok, error message on error. +*/ +static const char * +gtid_find_binlog_file(slave_connection_state *state, char *out_name) +{ + MEM_ROOT memroot; + binlog_file_entry *list; + Gtid_list_log_event *glev; + const char *errormsg= NULL; + IO_CACHE cache; + File file = (File)-1; + char buf[FN_REFLEN]; + + bzero((char*) &cache, sizeof(cache)); + init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0); + if (!(list= get_binlog_list(&memroot))) + { + errormsg= "Out of memory while looking for GTID position in binlog"; + goto end; + } + + while (list) + { + if (!list->next) + { + /* + It should be safe to read the currently used binlog, as we will only + read the header part that is already written. + + But if that does not work on windows, then we will need to cache the + event somewhere in memory I suppose - that could work too. + */ + } + /* + Read the Gtid_list_log_event at the start of the binlog file to + get the binlog state. + */ + if (normalize_binlog_name(buf, list->name, false)) + { + errormsg= "Failed to determine binlog file name while looking for " + "GTID position in binlog"; + goto end; + } + if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1 || + (errormsg= get_gtid_list_event(&cache, &glev))) + goto end; + + if (!glev || contains_all_slave_gtid(state, glev)) + { + strmake(out_name, buf, FN_REFLEN); + goto end; + } + list= list->next; + } + + /* We reached the end without finding anything. */ + errormsg= "Could not find GTID state requested by slave in any binlog " + "files. Probably the slave state is too old and required binlog files " + "have been purged."; + +end: + if (file != (File)-1) + { + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + } + + free_root(&memroot, MYF(0)); + return errormsg; +} + + +enum enum_gtid_skip_type { + GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION +}; + /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -579,10 +809,65 @@ static const char * send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Log_event_type event_type, char *log_file_name, IO_CACHE *log, int mariadb_slave_capability, - ulong ev_offset, uint8 current_checksum_alg) + ulong ev_offset, uint8 current_checksum_alg, + bool using_gtid_state, slave_connection_state *gtid_state, + enum_gtid_skip_type *gtid_skip_group) { my_off_t pos; + /* Skip GTID event groups until we reach slave position within a domain_id. */ + if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0) + { + uint32 server_id, domain_id; + uint64 seq_no; + uchar flags2; + rpl_gtid *gtid; + size_t len= packet->length(); + + if (ev_offset > len || + Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + &domain_id, &server_id, &seq_no, &flags2)) + return "Failed to read Gtid_log_event: corrupt binlog"; + gtid= gtid_state->find(domain_id); + if (gtid != NULL) + { + /* Skip this event group if we have not yet reached slave start pos. */ + if (server_id != gtid->server_id || seq_no <= gtid->seq_no) + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + /* + Delete this entry if we have reached slave start position (so we will + not skip subsequent events and won't have to look them up and check). + */ + if (server_id == gtid->server_id && seq_no >= gtid->seq_no) + gtid_state->remove(gtid); + } + } + + /* + Skip event group if we have not yet reached the correct slave GTID position. + + Note that slave that understands GTID can also tolerate holes, so there is + no need to supply dummy event. + */ + switch (*gtid_skip_group) + { + case GTID_SKIP_STANDALONE: + if (event_type != INTVAR_EVENT && + event_type != RAND_EVENT && + event_type != USER_VAR_EVENT && + event_type != TABLE_MAP_EVENT && + event_type != ANNOTATE_ROWS_EVENT) + *gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_TRANSACTION: + if (event_type == XID_EVENT /* ToDo || is_COMMIT_query_event() */) + *gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_NOT: + break; + } + /* Do not send annotate_rows events unless slave requested it. */ if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) { @@ -722,6 +1007,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, mysql_mutex_t *log_lock; mysql_cond_t *log_cond; int mariadb_slave_capability; + char str_buf[256]; + String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); + bool using_gtid_state; + slave_connection_state gtid_state; + enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -748,6 +1038,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, set_timespec_nsec(*heartbeat_ts, 0); } mariadb_slave_capability= get_mariadb_slave_capability(thd); + + connect_gtid_state.length(0); + using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + if (global_system_variables.log_warnings > 1) sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", (int)thd->variables.server_id, log_ident, (ulong)pos); @@ -781,10 +1075,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } name=search_file_name; - if (log_ident[0]) - mysql_bin_log.make_log_name(search_file_name, log_ident); + if (using_gtid_state) + { + if (gtid_state.load(connect_gtid_state.c_ptr_quick(), + connect_gtid_state.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining start " + "position from GTID state"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name))) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + pos= 4; + } else - name=0; // Find first log + { + if (log_ident[0]) + mysql_bin_log.make_log_name(search_file_name, log_ident); + else + name=0; // Find first log + } linfo.index_file_offset = 0; @@ -1036,7 +1350,8 @@ impossible position"; if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, - current_checksum_alg))) + current_checksum_alg, using_gtid_state, + >id_state, >id_skip_group))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1197,7 +1512,9 @@ impossible position"; (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, - current_checksum_alg))) + current_checksum_alg, + using_gtid_state, >id_state, + >id_skip_group))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; |