diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 554 |
1 files changed, 539 insertions, 15 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 529f702f8a9..104ea948cfc 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -537,7 +537,7 @@ static char *slave_load_file_stem(char *buf, uint file_id, to_unix_path(buf); buf = strend(buf); - buf = int10_to_str(::server_id, buf, 10); + buf = int10_to_str(global_system_variables.server_id, buf, 10); *buf++ = '-'; buf = int10_to_str(event_server_id, buf, 10); *buf++ = '-'; @@ -573,7 +573,7 @@ static void cleanup_load_tmpdir() LOAD DATA. */ p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD)); - p= int10_to_str(::server_id, p, 10); + p= int10_to_str(global_system_variables.server_id, p, 10); *(p++)= '-'; *p= 0; @@ -749,6 +749,8 @@ const char* Log_event::get_type_str(Log_event_type type) case INCIDENT_EVENT: return "Incident"; case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint"; + case GTID_EVENT: return "Gtid"; + case GTID_LIST_EVENT: return "Gtid_list"; default: return "Unknown"; /* impossible */ } } @@ -769,7 +771,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) crc(0), thd(thd_arg), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { - server_id= thd->server_id; + server_id= thd->variables.server_id; when= thd->start_time; when_sec_part=thd->start_time_sec_part; @@ -794,7 +796,7 @@ Log_event::Log_event() cache_type(Log_event::EVENT_INVALID_CACHE), crc(0), thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { - server_id= ::server_id; + server_id= global_system_variables.server_id; /* We can't call my_time() here as this would cause a call before my_init() is called @@ -909,9 +911,11 @@ int Log_event::do_update_pos(Relay_log_info *rli) if (debug_not_change_ts_if_art_event == 1 && is_artificial_event()) debug_not_change_ts_if_art_event= 0; ); - rli->stmt_done(log_pos, is_artificial_event() && - IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? - 0 : when); + rli->stmt_done(log_pos, + (is_artificial_event() && + IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? + 0 : when), + thd); DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp", if (debug_not_change_ts_if_art_event == 0) debug_not_change_ts_if_art_event= 2; ); @@ -926,10 +930,11 @@ Log_event::do_shall_skip(Relay_log_info *rli) DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu," " rli->replicate_same_server_id: %d," " rli->slave_skip_counter: %lu", - (ulong) server_id, (ulong) ::server_id, + (ulong) server_id, (ulong) global_system_variables.server_id, rli->replicate_same_server_id, rli->slave_skip_counter)); - if ((server_id == ::server_id && !rli->replicate_same_server_id) || + if ((server_id == global_system_variables.server_id && + !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()) || (flags & LOG_EVENT_SKIP_REPLICATION_F && opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)) @@ -1560,6 +1565,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case BINLOG_CHECKPOINT_EVENT: ev = new Binlog_checkpoint_log_event(buf, event_len, description_event); break; + case GTID_EVENT: + ev = new Gtid_log_event(buf, event_len, description_event); + break; + case GTID_LIST_EVENT: + ev = new Gtid_list_log_event(buf, event_len, description_event); + break; #ifdef HAVE_REPLICATION case SLAVE_EVENT: /* can never happen (unused event) */ ev = new Slave_log_event(buf, event_len, description_event); @@ -3437,6 +3448,53 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset, return 0; } +/* + Replace an event (GTID event) with a BEGIN query event, to be compatible + with an old slave. +*/ +int +Query_log_event::begin_event(String *packet, ulong ev_offset, + uint8 checksum_alg) +{ + uchar *p= (uchar *)packet->ptr() + ev_offset; + uchar *q= p + LOG_EVENT_HEADER_LEN; + size_t data_len= packet->length() - ev_offset; + uint16 flags; + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + data_len-= BINLOG_CHECKSUM_LEN; + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + + /* Currently we only need to replace GTID event. */ + DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN); + if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return 1; + + flags= uint2korr(p + FLAGS_OFFSET); + flags&= ~LOG_EVENT_THREAD_SPECIFIC_F; + flags|= LOG_EVENT_SUPPRESS_USE_F; + int2store(p + FLAGS_OFFSET, flags); + + p[EVENT_TYPE_OFFSET]= QUERY_EVENT; + int4store(q + Q_THREAD_ID_OFFSET, 0); + int4store(q + Q_EXEC_TIME_OFFSET, 0); + q[Q_DB_LEN_OFFSET]= 0; + int2store(q + Q_ERR_CODE_OFFSET, 0); + int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0); + q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */ + q+= Q_DATA_OFFSET + 1; + memcpy(q, "BEGIN", 5); + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + ha_checksum crc= my_checksum(0L, p, data_len); + int4store(p + data_len, crc); + } + return 0; +} + #ifdef MYSQL_CLIENT /** @@ -3696,6 +3754,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, LEX_STRING new_db; int expected_error,actual_error= 0; HA_CREATE_INFO db_options; + uint64 sub_id= 0; + rpl_gtid gtid; DBUG_ENTER("Query_log_event::do_apply_event"); /* @@ -3883,6 +3943,30 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, else thd->variables.collation_database= thd->db_charset; + /* + Record any GTID in the same transaction, so slave state is + transactionally consistent. + */ + if (strcmp("COMMIT", query) == 0 && (sub_id= rli->gtid_sub_id)) + { + /* Clear the GTID from the RLI so we don't accidentally reuse it. */ + const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + + gtid= rli->current_gtid; + if (rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true)) + { + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Error during COMMIT: failed to update GTID state in " + "%s.%s: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + trans_rollback(thd); + sub_id= 0; + thd->is_slave_error= 1; + goto end; + } + } + thd->table_map_for_update= (table_map)table_map_for_update; thd->set_invoker(&user, &host); /* @@ -4068,6 +4152,9 @@ Default database: '%s'. Query: '%s'", } end: + if (sub_id && !thd->is_slave_error) + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id); + /* Probably we have set thd->query, thd->db, thd->catalog to point to places in the data_buf of this event. Now the event is going to be deleted @@ -4145,6 +4232,17 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) DBUG_RETURN(Log_event::do_shall_skip(rli)); } + +bool +Query_log_event::peek_is_commit_rollback(const char *event_start, + size_t event_len) +{ + if (event_len < LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN || event_len < 9) + return false; + return !memcmp(event_start + (event_len-7), "\0COMMIT", 7) || + !memcmp(event_start + (event_len-9), "\0ROLLBACK", 9); +} + #endif @@ -4459,6 +4557,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; post_header_len[BINLOG_CHECKPOINT_EVENT-1]= BINLOG_CHECKPOINT_HEADER_LEN; + post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN; + post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN; // Sanity-check that all post header lengths are initialized. int i; @@ -4663,7 +4763,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) perform, we don't call Start_log_event_v3::do_apply_event() (this was just to update the log's description event). */ - if (server_id != (uint32) ::server_id) + if (server_id != (uint32) global_system_variables.server_id) { /* If the event was not requested by the slave i.e. the master sent @@ -4689,7 +4789,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) int Format_description_log_event::do_update_pos(Relay_log_info *rli) { - if (server_id == (uint32) ::server_id) + if (server_id == (uint32) global_system_variables.server_id) { /* We only increase the relay log position if we are skipping @@ -5744,7 +5844,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) #endif DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu", - (ulong) this->server_id, (ulong) ::server_id)); + (ulong) this->server_id, (ulong) global_system_variables.server_id)); DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident)); DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf))); @@ -5764,7 +5864,8 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) 5.0.0, there also are some rotates from the slave itself, in the relay log, which shall not change the group positions. */ - if ((server_id != ::server_id || rli->replicate_same_server_id) && + if ((server_id != global_system_variables.server_id || + rli->replicate_same_server_id) && !is_relay_log_event() && !rli->is_in_group()) { @@ -5781,6 +5882,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) rli->group_master_log_name, (ulong) rli->group_master_log_pos)); mysql_mutex_unlock(&rli->data_lock); + rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli); flush_relay_log_info(rli); /* @@ -5905,6 +6007,394 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) /************************************************************************** + Global transaction ID stuff +**************************************************************************/ + +Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + : Log_event(buf, description_event), seq_no(0) +{ + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < GTID_HEADER_LEN) + return; + + buf+= header_size; + seq_no= uint8korr(buf); + buf+= 8; + domain_id= uint4korr(buf); + buf+= 4; + flags2= *buf; +} + + +#ifdef MYSQL_SERVER + +Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, + uint32 domain_id_arg, bool standalone, + uint16 flags_arg, bool is_transactional) + : Log_event(thd_arg, flags_arg, is_transactional), + seq_no(seq_no_arg), domain_id(domain_id_arg), + flags2(standalone ? FL_STANDALONE : 0) +{ + cache_type= Log_event::EVENT_NO_CACHE; +} + + +/* + Used to record GTID while sending binlog to slave, without having to + fully contruct every Gtid_log_event() needlessly. +*/ +bool +Gtid_log_event::peek(const char *event_start, size_t event_len, + uint32 *domain_id, uint32 *server_id, uint64 *seq_no, + uchar *flags2) +{ + const char *p; + if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return true; + *server_id= uint4korr(event_start + SERVER_ID_OFFSET); + p= event_start + LOG_EVENT_HEADER_LEN; + *seq_no= uint8korr(p); + p+= 8; + *domain_id= uint4korr(p); + p+= 4; + *flags2= (uchar)*p; + return false; +} + + +bool +Gtid_log_event::write(IO_CACHE *file) +{ + uchar buf[GTID_HEADER_LEN]; + int8store(buf, seq_no); + int4store(buf+8, domain_id); + buf[12]= flags2; + bzero(buf+13, GTID_HEADER_LEN-13); + return write_header(file, GTID_HEADER_LEN) || + wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) || + write_footer(file); +} + + +/* + Replace a GTID event with either a BEGIN event, dummy event, or nothing, as + appropriate to work with old slave that does not know global transaction id. + + The need_dummy_event argument is an IN/OUT argument. It is passed as TRUE + if slave has capability lower than MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES. + It is returned TRUE if we return a BEGIN (or dummy) event to be sent to the + slave, FALSE if event should be skipped completely. +*/ +int +Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, + ulong ev_offset, uint8 checksum_alg) +{ + uchar flags2; + if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return 1; + flags2= (*packet)[ev_offset + LOG_EVENT_HEADER_LEN + 12]; + if (flags2 & FL_STANDALONE) + { + if (need_dummy_event) + return Query_log_event::dummy_event(packet, ev_offset, checksum_alg); + else + return 0; + } + + *need_dummy_event= true; + return Query_log_event::begin_event(packet, ev_offset, checksum_alg); +} + + +#ifdef HAVE_REPLICATION +void +Gtid_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf[6+5+10+1+10+1+20+1]; + char *p; + p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); + p= longlong10_to_str(domain_id, p, 10); + *p++= '-'; + p= longlong10_to_str(server_id, p, 10); + *p++= '-'; + p= longlong10_to_str(seq_no, p, 10); + + protocol->store(buf, p-buf, &my_charset_bin); +} + +static char gtid_begin_string[] = "BEGIN"; + +int +Gtid_log_event::do_apply_event(Relay_log_info const *rli) +{ + thd->variables.server_id= this->server_id; + thd->variables.gtid_domain_id= this->domain_id; + thd->variables.gtid_seq_no= this->seq_no; + + if (flags2 & FL_STANDALONE) + return 0; + + /* Execute this like a BEGIN query event. */ + thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, + &my_charset_bin, next_query_id()); + Parser_state parser_state; + if (!parser_state.init(thd, thd->query(), thd->query_length())) + { + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + /* Finalize server status flags after executing a statement. */ + thd->update_server_status(); + log_slow_statement(thd); + if (unlikely(thd->is_fatal_error)) + thd->is_slave_error= 1; + else if (likely(!thd->is_slave_error)) + general_log_write(thd, COM_QUERY, thd->query(), thd->query_length()); + } + + thd->reset_query(); + free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); + return thd->is_slave_error; +} + + +int +Gtid_log_event::do_update_pos(Relay_log_info *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} + + +Log_event::enum_skip_reason +Gtid_log_event::do_shall_skip(Relay_log_info *rli) +{ + /* + An event skipped due to @@skip_replication must not be counted towards the + number of events to be skipped due to @@sql_slave_skip_counter. + */ + if (flags & LOG_EVENT_SKIP_REPLICATION_F && + opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE) + return Log_event::EVENT_SKIP_IGNORE; + + if (rli->slave_skip_counter > 0) + { + if (!(flags2 & FL_STANDALONE)) + thd->variables.option_bits|= OPTION_BEGIN; + return Log_event::continue_group(rli); + } + return Log_event::do_shall_skip(rli); +} + + +#endif /* HAVE_REPLICATION */ + +#else /* !MYSQL_SERVER */ + +void +Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char buf[21]; + + if (!print_event_info->short_form) + { + print_header(&cache, print_event_info, FALSE); + longlong10_to_str(seq_no, buf, 10); + my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf); + + if (!print_event_info->domain_id_printed || + print_event_info->domain_id != domain_id) + { + my_b_printf(&cache, "/*!100001 SET @@session.gtid_domain_id=%u*/%s\n", + domain_id, print_event_info->delimiter); + print_event_info->domain_id= domain_id; + print_event_info->domain_id_printed= true; + } + + if (!print_event_info->server_id_printed || + print_event_info->server_id != server_id) + { + my_b_printf(&cache, "/*!100001 SET @@session.server_id=%u*/%s\n", + server_id, print_event_info->delimiter); + print_event_info->server_id= server_id; + print_event_info->server_id_printed= true; + } + + my_b_printf(&cache, "/*!100001 SET @@session.gtid_seq_no=%s*/%s\n", + buf, print_event_info->delimiter); + } + if (!(flags2 & FL_STANDALONE)) + my_b_printf(&cache, "BEGIN\n%s\n", print_event_info->delimiter); +} + +#endif /* MYSQL_SERVER */ + + +/* GTID list. */ + +Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + : Log_event(buf, description_event), count(0), list(0) +{ + uint32 i; + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < GTID_LIST_HEADER_LEN) + return; + + buf+= header_size; + count= uint4korr(buf) & ((1<<28)-1); + buf+= 4; + if (event_len - (header_size + post_header_len) < count*element_size || + (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0), + MYF(MY_WME))))) + return; + + for (i= 0; i < count; ++i) + { + list[i].domain_id= uint4korr(buf); + buf+= 4; + list[i].server_id= uint4korr(buf); + buf+= 4; + list[i].seq_no= uint8korr(buf); + buf+= 8; + } +} + + +#ifdef MYSQL_SERVER + +Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) + : count(gtid_set->count()), list(0) +{ + cache_type= EVENT_NO_CACHE; + /* Failure to allocate memory will be caught by is_valid() returning false. */ + if (count < (1<<28) && + (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0), + MYF(MY_WME)))) + gtid_set->get_gtid_list(list, count); +} + +bool +Gtid_list_log_event::write(IO_CACHE *file) +{ + uint32 i; + uchar buf[element_size]; + + DBUG_ASSERT(count < 1<<28); + + if (write_header(file, get_data_size())) + return 1; + int4store(buf, count & ((1<<28)-1)); + if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN)) + return 1; + for (i= 0; i < count; ++i) + { + int4store(buf, list[i].domain_id); + int4store(buf+4, list[i].server_id); + int8store(buf+8, list[i].seq_no); + if (wrapper_my_b_safe_write(file, buf, element_size)) + return 1; + } + return write_footer(file); +} + + +#ifdef HAVE_REPLICATION +void +Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf_mem[1024]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + uint32 i; + bool first; + + buf.length(0); + buf.append(STRING_WITH_LEN("[")); + first= true; + for (i= 0; i < count; ++i) + rpl_slave_state_tostring_helper(&buf, &list[i], &first); + buf.append(STRING_WITH_LEN("]")); + + protocol->store(&buf); +} +#endif /* HAVE_REPLICATION */ + +#else /* !MYSQL_SERVER */ + +void +Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + if (!print_event_info->short_form) + { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char buf[21]; + uint32 i; + + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tGtid list ["); + for (i= 0; i < count; ++i) + { + longlong10_to_str(list[i].seq_no, buf, 10); + my_b_printf(&cache, "%u-%u-%s", list[i].domain_id, + list[i].server_id, buf); + if (i < count-1) + my_b_printf(&cache, ",\n# "); + } + my_b_printf(&cache, "]\n"); + } +} + +#endif /* MYSQL_SERVER */ + + +/* + Used to record gtid_list event while sending binlog to slave, without having to + fully contruct the event object. +*/ +bool +Gtid_list_log_event::peek(const char *event_start, uint32 event_len, + rpl_gtid **out_gtid_list, uint32 *out_list_len) +{ + const char *p; + uint32 count_field, count; + rpl_gtid *gtid_list; + + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN) + return true; + p= event_start + LOG_EVENT_HEADER_LEN; + count_field= uint4korr(p); + p+= 4; + count= count_field & ((1<<28)-1); + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN + + 16 * count) + return true; + if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0), + MYF(MY_WME)))) + return true; + *out_gtid_list= gtid_list; + *out_list_len= count; + while (count--) + { + gtid_list->domain_id= uint4korr(p); + p+= 4; + gtid_list->server_id= uint4korr(p); + p+= 4; + gtid_list->seq_no= uint8korr(p); + p+= 8; + ++gtid_list; + } + + return false; +} + + +/************************************************************************** Intvar_log_event methods **************************************************************************/ @@ -6257,12 +6747,43 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) int Xid_log_event::do_apply_event(Relay_log_info const *rli) { bool res; + int err; + rpl_gtid gtid; + uint64 sub_id; + + /* + Record any GTID in the same transaction, so slave state is transactionally + consistent. + */ + if ((sub_id= rli->gtid_sub_id)) + { + /* Clear the GTID from the RLI so we don't accidentally reuse it. */ + const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + + gtid= rli->current_gtid; + err= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true); + if (err) + { + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Error during XID COMMIT: failed to update GTID state in " + "%s.%s: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + trans_rollback(thd); + thd->is_slave_error= 1; + return err; + } + } + /* For a slave Xid_log_event is COMMIT */ general_log_print(thd, COM_QUERY, "COMMIT /* implicit, from Xid_log_event */"); res= trans_commit(thd); /* Automatically rolls back on error. */ thd->mdl_context.release_transactional_locks(); + if (sub_id) + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id); + /* Increment the global status commit count variable */ @@ -7009,6 +7530,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli) rli->inc_event_relay_log_pos(); else { + rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli); rli->inc_group_relay_log_pos(0); flush_relay_log_info(rli); } @@ -8810,7 +9332,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) Step the group log position if we are not in a transaction, otherwise increase the event log position. */ - rli->stmt_done(log_pos, when); + rli->stmt_done(log_pos, when, thd); /* Clear any errors in thd->net.last_err*. It is not known if this is needed or not. It is believed that any errors that may exist in @@ -11148,7 +11670,9 @@ st_print_event_info::st_print_event_info() auto_increment_increment(0),auto_increment_offset(0), charset_inited(0), lc_time_names_number(~0), charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), - thread_id(0), thread_id_printed(false), skip_replication(0), + thread_id(0), thread_id_printed(false), server_id(0), + server_id_printed(false), domain_id(0), domain_id_printed(false), + skip_replication(0), base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) { /* |