summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc554
1 files changed, 539 insertions, 15 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 529f702f8a9..104ea948cfc 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -537,7 +537,7 @@ static char *slave_load_file_stem(char *buf, uint file_id,
to_unix_path(buf);
buf = strend(buf);
- buf = int10_to_str(::server_id, buf, 10);
+ buf = int10_to_str(global_system_variables.server_id, buf, 10);
*buf++ = '-';
buf = int10_to_str(event_server_id, buf, 10);
*buf++ = '-';
@@ -573,7 +573,7 @@ static void cleanup_load_tmpdir()
LOAD DATA.
*/
p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD));
- p= int10_to_str(::server_id, p, 10);
+ p= int10_to_str(global_system_variables.server_id, p, 10);
*(p++)= '-';
*p= 0;
@@ -749,6 +749,8 @@ const char* Log_event::get_type_str(Log_event_type type)
case INCIDENT_EVENT: return "Incident";
case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
+ case GTID_EVENT: return "Gtid";
+ case GTID_LIST_EVENT: return "Gtid_list";
default: return "Unknown"; /* impossible */
}
}
@@ -769,7 +771,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
crc(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
- server_id= thd->server_id;
+ server_id= thd->variables.server_id;
when= thd->start_time;
when_sec_part=thd->start_time_sec_part;
@@ -794,7 +796,7 @@ Log_event::Log_event()
cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
- server_id= ::server_id;
+ server_id= global_system_variables.server_id;
/*
We can't call my_time() here as this would cause a call before
my_init() is called
@@ -909,9 +911,11 @@ int Log_event::do_update_pos(Relay_log_info *rli)
if (debug_not_change_ts_if_art_event == 1
&& is_artificial_event())
debug_not_change_ts_if_art_event= 0; );
- rli->stmt_done(log_pos, is_artificial_event() &&
- IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
- 0 : when);
+ rli->stmt_done(log_pos,
+ (is_artificial_event() &&
+ IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
+ 0 : when),
+ thd);
DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp",
if (debug_not_change_ts_if_art_event == 0)
debug_not_change_ts_if_art_event= 2; );
@@ -926,10 +930,11 @@ Log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu,"
" rli->replicate_same_server_id: %d,"
" rli->slave_skip_counter: %lu",
- (ulong) server_id, (ulong) ::server_id,
+ (ulong) server_id, (ulong) global_system_variables.server_id,
rli->replicate_same_server_id,
rli->slave_skip_counter));
- if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
+ if ((server_id == global_system_variables.server_id &&
+ !rli->replicate_same_server_id) ||
(rli->slave_skip_counter == 1 && rli->is_in_group()) ||
(flags & LOG_EVENT_SKIP_REPLICATION_F &&
opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE))
@@ -1560,6 +1565,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case BINLOG_CHECKPOINT_EVENT:
ev = new Binlog_checkpoint_log_event(buf, event_len, description_event);
break;
+ case GTID_EVENT:
+ ev = new Gtid_log_event(buf, event_len, description_event);
+ break;
+ case GTID_LIST_EVENT:
+ ev = new Gtid_list_log_event(buf, event_len, description_event);
+ break;
#ifdef HAVE_REPLICATION
case SLAVE_EVENT: /* can never happen (unused event) */
ev = new Slave_log_event(buf, event_len, description_event);
@@ -3437,6 +3448,53 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset,
return 0;
}
+/*
+ Replace an event (GTID event) with a BEGIN query event, to be compatible
+ with an old slave.
+*/
+int
+Query_log_event::begin_event(String *packet, ulong ev_offset,
+ uint8 checksum_alg)
+{
+ uchar *p= (uchar *)packet->ptr() + ev_offset;
+ uchar *q= p + LOG_EVENT_HEADER_LEN;
+ size_t data_len= packet->length() - ev_offset;
+ uint16 flags;
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ data_len-= BINLOG_CHECKSUM_LEN;
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ /* Currently we only need to replace GTID event. */
+ DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN);
+ if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return 1;
+
+ flags= uint2korr(p + FLAGS_OFFSET);
+ flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
+ flags|= LOG_EVENT_SUPPRESS_USE_F;
+ int2store(p + FLAGS_OFFSET, flags);
+
+ p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
+ int4store(q + Q_THREAD_ID_OFFSET, 0);
+ int4store(q + Q_EXEC_TIME_OFFSET, 0);
+ q[Q_DB_LEN_OFFSET]= 0;
+ int2store(q + Q_ERR_CODE_OFFSET, 0);
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
+ q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 1;
+ memcpy(q, "BEGIN", 5);
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ ha_checksum crc= my_checksum(0L, p, data_len);
+ int4store(p + data_len, crc);
+ }
+ return 0;
+}
+
#ifdef MYSQL_CLIENT
/**
@@ -3696,6 +3754,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
LEX_STRING new_db;
int expected_error,actual_error= 0;
HA_CREATE_INFO db_options;
+ uint64 sub_id= 0;
+ rpl_gtid gtid;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -3883,6 +3943,30 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
else
thd->variables.collation_database= thd->db_charset;
+ /*
+ Record any GTID in the same transaction, so slave state is
+ transactionally consistent.
+ */
+ if (strcmp("COMMIT", query) == 0 && (sub_id= rli->gtid_sub_id))
+ {
+ /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+ const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+ gtid= rli->current_gtid;
+ if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true))
+ {
+ rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Error during COMMIT: failed to update GTID state in "
+ "%s.%s: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ trans_rollback(thd);
+ sub_id= 0;
+ thd->is_slave_error= 1;
+ goto end;
+ }
+ }
+
thd->table_map_for_update= (table_map)table_map_for_update;
thd->set_invoker(&user, &host);
/*
@@ -4068,6 +4152,9 @@ Default database: '%s'. Query: '%s'",
}
end:
+ if (sub_id && !thd->is_slave_error)
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
in the data_buf of this event. Now the event is going to be deleted
@@ -4145,6 +4232,17 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_RETURN(Log_event::do_shall_skip(rli));
}
+
+bool
+Query_log_event::peek_is_commit_rollback(const char *event_start,
+ size_t event_len)
+{
+ if (event_len < LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN || event_len < 9)
+ return false;
+ return !memcmp(event_start + (event_len-7), "\0COMMIT", 7) ||
+ !memcmp(event_start + (event_len-9), "\0ROLLBACK", 9);
+}
+
#endif
@@ -4459,6 +4557,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
post_header_len[BINLOG_CHECKPOINT_EVENT-1]=
BINLOG_CHECKPOINT_HEADER_LEN;
+ post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN;
+ post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
// Sanity-check that all post header lengths are initialized.
int i;
@@ -4663,7 +4763,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
perform, we don't call Start_log_event_v3::do_apply_event()
(this was just to update the log's description event).
*/
- if (server_id != (uint32) ::server_id)
+ if (server_id != (uint32) global_system_variables.server_id)
{
/*
If the event was not requested by the slave i.e. the master sent
@@ -4689,7 +4789,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
{
- if (server_id == (uint32) ::server_id)
+ if (server_id == (uint32) global_system_variables.server_id)
{
/*
We only increase the relay log position if we are skipping
@@ -5744,7 +5844,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
#endif
DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu",
- (ulong) this->server_id, (ulong) ::server_id));
+ (ulong) this->server_id, (ulong) global_system_variables.server_id));
DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
@@ -5764,7 +5864,8 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
5.0.0, there also are some rotates from the slave itself, in the
relay log, which shall not change the group positions.
*/
- if ((server_id != ::server_id || rli->replicate_same_server_id) &&
+ if ((server_id != global_system_variables.server_id ||
+ rli->replicate_same_server_id) &&
!is_relay_log_event() &&
!rli->is_in_group())
{
@@ -5781,6 +5882,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
mysql_mutex_unlock(&rli->data_lock);
+ rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli);
flush_relay_log_info(rli);
/*
@@ -5905,6 +6007,394 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
/**************************************************************************
+ Global transaction ID stuff
+**************************************************************************/
+
+Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ : Log_event(buf, description_event), seq_no(0)
+{
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
+ if (event_len < header_size + post_header_len ||
+ post_header_len < GTID_HEADER_LEN)
+ return;
+
+ buf+= header_size;
+ seq_no= uint8korr(buf);
+ buf+= 8;
+ domain_id= uint4korr(buf);
+ buf+= 4;
+ flags2= *buf;
+}
+
+
+#ifdef MYSQL_SERVER
+
+Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
+ uint32 domain_id_arg, bool standalone,
+ uint16 flags_arg, bool is_transactional)
+ : Log_event(thd_arg, flags_arg, is_transactional),
+ seq_no(seq_no_arg), domain_id(domain_id_arg),
+ flags2(standalone ? FL_STANDALONE : 0)
+{
+ cache_type= Log_event::EVENT_NO_CACHE;
+}
+
+
+/*
+ Used to record GTID while sending binlog to slave, without having to
+ fully contruct every Gtid_log_event() needlessly.
+*/
+bool
+Gtid_log_event::peek(const char *event_start, size_t event_len,
+ uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
+ uchar *flags2)
+{
+ const char *p;
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return true;
+ *server_id= uint4korr(event_start + SERVER_ID_OFFSET);
+ p= event_start + LOG_EVENT_HEADER_LEN;
+ *seq_no= uint8korr(p);
+ p+= 8;
+ *domain_id= uint4korr(p);
+ p+= 4;
+ *flags2= (uchar)*p;
+ return false;
+}
+
+
+bool
+Gtid_log_event::write(IO_CACHE *file)
+{
+ uchar buf[GTID_HEADER_LEN];
+ int8store(buf, seq_no);
+ int4store(buf+8, domain_id);
+ buf[12]= flags2;
+ bzero(buf+13, GTID_HEADER_LEN-13);
+ return write_header(file, GTID_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) ||
+ write_footer(file);
+}
+
+
+/*
+ Replace a GTID event with either a BEGIN event, dummy event, or nothing, as
+ appropriate to work with old slave that does not know global transaction id.
+
+ The need_dummy_event argument is an IN/OUT argument. It is passed as TRUE
+ if slave has capability lower than MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES.
+ It is returned TRUE if we return a BEGIN (or dummy) event to be sent to the
+ slave, FALSE if event should be skipped completely.
+*/
+int
+Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
+ ulong ev_offset, uint8 checksum_alg)
+{
+ uchar flags2;
+ if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return 1;
+ flags2= (*packet)[ev_offset + LOG_EVENT_HEADER_LEN + 12];
+ if (flags2 & FL_STANDALONE)
+ {
+ if (need_dummy_event)
+ return Query_log_event::dummy_event(packet, ev_offset, checksum_alg);
+ else
+ return 0;
+ }
+
+ *need_dummy_event= true;
+ return Query_log_event::begin_event(packet, ev_offset, checksum_alg);
+}
+
+
+#ifdef HAVE_REPLICATION
+void
+Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
+{
+ char buf[6+5+10+1+10+1+20+1];
+ char *p;
+ p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID "));
+ p= longlong10_to_str(domain_id, p, 10);
+ *p++= '-';
+ p= longlong10_to_str(server_id, p, 10);
+ *p++= '-';
+ p= longlong10_to_str(seq_no, p, 10);
+
+ protocol->store(buf, p-buf, &my_charset_bin);
+}
+
+static char gtid_begin_string[] = "BEGIN";
+
+int
+Gtid_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ thd->variables.server_id= this->server_id;
+ thd->variables.gtid_domain_id= this->domain_id;
+ thd->variables.gtid_seq_no= this->seq_no;
+
+ if (flags2 & FL_STANDALONE)
+ return 0;
+
+ /* Execute this like a BEGIN query event. */
+ thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
+ &my_charset_bin, next_query_id());
+ Parser_state parser_state;
+ if (!parser_state.init(thd, thd->query(), thd->query_length()))
+ {
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
+ /* Finalize server status flags after executing a statement. */
+ thd->update_server_status();
+ log_slow_statement(thd);
+ if (unlikely(thd->is_fatal_error))
+ thd->is_slave_error= 1;
+ else if (likely(!thd->is_slave_error))
+ general_log_write(thd, COM_QUERY, thd->query(), thd->query_length());
+ }
+
+ thd->reset_query();
+ free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
+ return thd->is_slave_error;
+}
+
+
+int
+Gtid_log_event::do_update_pos(Relay_log_info *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+
+
+Log_event::enum_skip_reason
+Gtid_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ /*
+ An event skipped due to @@skip_replication must not be counted towards the
+ number of events to be skipped due to @@sql_slave_skip_counter.
+ */
+ if (flags & LOG_EVENT_SKIP_REPLICATION_F &&
+ opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)
+ return Log_event::EVENT_SKIP_IGNORE;
+
+ if (rli->slave_skip_counter > 0)
+ {
+ if (!(flags2 & FL_STANDALONE))
+ thd->variables.option_bits|= OPTION_BEGIN;
+ return Log_event::continue_group(rli);
+ }
+ return Log_event::do_shall_skip(rli);
+}
+
+
+#endif /* HAVE_REPLICATION */
+
+#else /* !MYSQL_SERVER */
+
+void
+Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ Write_on_release_cache cache(&print_event_info->head_cache, file,
+ Write_on_release_cache::FLUSH_F);
+ char buf[21];
+
+ if (!print_event_info->short_form)
+ {
+ print_header(&cache, print_event_info, FALSE);
+ longlong10_to_str(seq_no, buf, 10);
+ my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf);
+
+ if (!print_event_info->domain_id_printed ||
+ print_event_info->domain_id != domain_id)
+ {
+ my_b_printf(&cache, "/*!100001 SET @@session.gtid_domain_id=%u*/%s\n",
+ domain_id, print_event_info->delimiter);
+ print_event_info->domain_id= domain_id;
+ print_event_info->domain_id_printed= true;
+ }
+
+ if (!print_event_info->server_id_printed ||
+ print_event_info->server_id != server_id)
+ {
+ my_b_printf(&cache, "/*!100001 SET @@session.server_id=%u*/%s\n",
+ server_id, print_event_info->delimiter);
+ print_event_info->server_id= server_id;
+ print_event_info->server_id_printed= true;
+ }
+
+ my_b_printf(&cache, "/*!100001 SET @@session.gtid_seq_no=%s*/%s\n",
+ buf, print_event_info->delimiter);
+ }
+ if (!(flags2 & FL_STANDALONE))
+ my_b_printf(&cache, "BEGIN\n%s\n", print_event_info->delimiter);
+}
+
+#endif /* MYSQL_SERVER */
+
+
+/* GTID list. */
+
+Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ : Log_event(buf, description_event), count(0), list(0)
+{
+ uint32 i;
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
+ if (event_len < header_size + post_header_len ||
+ post_header_len < GTID_LIST_HEADER_LEN)
+ return;
+
+ buf+= header_size;
+ count= uint4korr(buf) & ((1<<28)-1);
+ buf+= 4;
+ if (event_len - (header_size + post_header_len) < count*element_size ||
+ (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0),
+ MYF(MY_WME)))))
+ return;
+
+ for (i= 0; i < count; ++i)
+ {
+ list[i].domain_id= uint4korr(buf);
+ buf+= 4;
+ list[i].server_id= uint4korr(buf);
+ buf+= 4;
+ list[i].seq_no= uint8korr(buf);
+ buf+= 8;
+ }
+}
+
+
+#ifdef MYSQL_SERVER
+
+Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
+ : count(gtid_set->count()), list(0)
+{
+ cache_type= EVENT_NO_CACHE;
+ /* Failure to allocate memory will be caught by is_valid() returning false. */
+ if (count < (1<<28) &&
+ (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0),
+ MYF(MY_WME))))
+ gtid_set->get_gtid_list(list, count);
+}
+
+bool
+Gtid_list_log_event::write(IO_CACHE *file)
+{
+ uint32 i;
+ uchar buf[element_size];
+
+ DBUG_ASSERT(count < 1<<28);
+
+ if (write_header(file, get_data_size()))
+ return 1;
+ int4store(buf, count & ((1<<28)-1));
+ if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN))
+ return 1;
+ for (i= 0; i < count; ++i)
+ {
+ int4store(buf, list[i].domain_id);
+ int4store(buf+4, list[i].server_id);
+ int8store(buf+8, list[i].seq_no);
+ if (wrapper_my_b_safe_write(file, buf, element_size))
+ return 1;
+ }
+ return write_footer(file);
+}
+
+
+#ifdef HAVE_REPLICATION
+void
+Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol)
+{
+ char buf_mem[1024];
+ String buf(buf_mem, sizeof(buf_mem), system_charset_info);
+ uint32 i;
+ bool first;
+
+ buf.length(0);
+ buf.append(STRING_WITH_LEN("["));
+ first= true;
+ for (i= 0; i < count; ++i)
+ rpl_slave_state_tostring_helper(&buf, &list[i], &first);
+ buf.append(STRING_WITH_LEN("]"));
+
+ protocol->store(&buf);
+}
+#endif /* HAVE_REPLICATION */
+
+#else /* !MYSQL_SERVER */
+
+void
+Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ Write_on_release_cache cache(&print_event_info->head_cache, file,
+ Write_on_release_cache::FLUSH_F);
+ char buf[21];
+ uint32 i;
+
+ print_header(&cache, print_event_info, FALSE);
+ my_b_printf(&cache, "\tGtid list [");
+ for (i= 0; i < count; ++i)
+ {
+ longlong10_to_str(list[i].seq_no, buf, 10);
+ my_b_printf(&cache, "%u-%u-%s", list[i].domain_id,
+ list[i].server_id, buf);
+ if (i < count-1)
+ my_b_printf(&cache, ",\n# ");
+ }
+ my_b_printf(&cache, "]\n");
+ }
+}
+
+#endif /* MYSQL_SERVER */
+
+
+/*
+ Used to record gtid_list event while sending binlog to slave, without having to
+ fully contruct the event object.
+*/
+bool
+Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
+ rpl_gtid **out_gtid_list, uint32 *out_list_len)
+{
+ const char *p;
+ uint32 count_field, count;
+ rpl_gtid *gtid_list;
+
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
+ return true;
+ p= event_start + LOG_EVENT_HEADER_LEN;
+ count_field= uint4korr(p);
+ p+= 4;
+ count= count_field & ((1<<28)-1);
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
+ 16 * count)
+ return true;
+ if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0),
+ MYF(MY_WME))))
+ return true;
+ *out_gtid_list= gtid_list;
+ *out_list_len= count;
+ while (count--)
+ {
+ gtid_list->domain_id= uint4korr(p);
+ p+= 4;
+ gtid_list->server_id= uint4korr(p);
+ p+= 4;
+ gtid_list->seq_no= uint8korr(p);
+ p+= 8;
+ ++gtid_list;
+ }
+
+ return false;
+}
+
+
+/**************************************************************************
Intvar_log_event methods
**************************************************************************/
@@ -6257,12 +6747,43 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{
bool res;
+ int err;
+ rpl_gtid gtid;
+ uint64 sub_id;
+
+ /*
+ Record any GTID in the same transaction, so slave state is transactionally
+ consistent.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+ const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+ gtid= rli->current_gtid;
+ err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
+ if (err)
+ {
+ rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Error during XID COMMIT: failed to update GTID state in "
+ "%s.%s: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ trans_rollback(thd);
+ thd->is_slave_error= 1;
+ return err;
+ }
+ }
+
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
+ if (sub_id)
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+
/*
Increment the global status commit count variable
*/
@@ -7009,6 +7530,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli)
rli->inc_event_relay_log_pos();
else
{
+ rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli);
rli->inc_group_relay_log_pos(0);
flush_relay_log_info(rli);
}
@@ -8810,7 +9332,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
- rli->stmt_done(log_pos, when);
+ rli->stmt_done(log_pos, when, thd);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
@@ -11148,7 +11670,9 @@ st_print_event_info::st_print_event_info()
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
lc_time_names_number(~0),
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
- thread_id(0), thread_id_printed(false), skip_replication(0),
+ thread_id(0), thread_id_printed(false), server_id(0),
+ server_id_printed(false), domain_id(0), domain_id_printed(false),
+ skip_replication(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{
/*