summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-09-04 12:22:09 +0200
committerunknown <knielsen@knielsen-hq.org>2013-09-04 12:22:09 +0200
commitada15c7a0f7947073664451c3804ab03723c657e (patch)
tree9a4366bccc5f87f1f8ad2700de1c4f5978ede620
parent378bd0442a62d1067d19c67dddc3d8b27fc8a537 (diff)
downloadmariadb-git-ada15c7a0f7947073664451c3804ab03723c657e.tar.gz
Fix various places where code would work incorrectly if the common_header_len of events is different on master and slave
Patch developed with the help of Pavel Ivanov. Also fix an uninitialised variable in queue_event().
-rw-r--r--sql/log.cc16
-rw-r--r--sql/log.h1
-rw-r--r--sql/log_event.cc29
-rw-r--r--sql/log_event.h5
-rw-r--r--sql/slave.cc35
-rw-r--r--sql/sql_repl.cc72
6 files changed, 122 insertions, 36 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 041a0b555d1..f882cd5d64c 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -4806,12 +4806,23 @@ end:
}
-bool MYSQL_BIN_LOG::append(Log_event* ev)
+bool
+MYSQL_BIN_LOG::append(Log_event *ev)
{
- bool error = 0;
+ bool res;
mysql_mutex_lock(&LOCK_log);
+ res= append_no_lock(ev);
+ mysql_mutex_unlock(&LOCK_log);
+ return res;
+}
+
+
+bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
+{
+ bool error = 0;
DBUG_ENTER("MYSQL_BIN_LOG::append");
+ mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Log_event::write() is smart enough to use my_b_write() or
@@ -4829,7 +4840,6 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
if (my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
- mysql_mutex_unlock(&LOCK_log);
signal_update(); // Safe as we don't call close
DBUG_RETURN(error);
}
diff --git a/sql/log.h b/sql/log.h
index 11b9cd289f7..051ee8ea068 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -712,6 +712,7 @@ public:
*/
bool appendv(const char* buf,uint len,...);
bool append(Log_event* ev);
+ bool append_no_lock(Log_event* ev);
void mark_xids_active(ulong cookie, uint xid_count);
void mark_xid_done(ulong cookie, bool write_checkpoint);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 45e4379710a..51fa9d77267 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -4754,16 +4754,15 @@ bool Format_description_log_event::write(IO_CACHE* file)
We don't call Start_log_event_v3::write() because this would make 2
my_b_safe_write().
*/
- uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN];
- size_t rec_size= sizeof(buff);
+ uchar buff[START_V3_HEADER_LEN+1];
+ size_t rec_size= sizeof(buff) + BINLOG_CHECKSUM_ALG_DESC_LEN +
+ number_of_event_types;
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
if (!dont_set_created)
created= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
- memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len,
- LOG_EVENT_TYPES);
/*
if checksum is requested
record the checksum-algorithm descriptor next to
@@ -4776,7 +4775,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
#ifndef DBUG_OFF
data_written= 0; // to prepare for need_checksum assert
#endif
- buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ?
+ uchar checksum_byte= need_checksum() ?
checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
/*
FD of checksum-aware server is always checksum-equipped, (V) is in,
@@ -4796,7 +4795,10 @@ bool Format_description_log_event::write(IO_CACHE* file)
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
}
ret= (write_header(file, rec_size) ||
- wrapper_my_b_safe_write(file, buff, rec_size) ||
+ wrapper_my_b_safe_write(file, buff, sizeof(buff)) ||
+ wrapper_my_b_safe_write(file, (uchar*)post_header_len,
+ number_of_event_types) ||
+ wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) ||
write_footer(file));
if (no_checksum)
checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
@@ -6125,7 +6127,7 @@ bool
Gtid_log_event::peek(const char *event_start, size_t event_len,
uint8 checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
- uchar *flags2)
+ uchar *flags2, const Format_description_log_event *fdev)
{
const char *p;
@@ -6140,10 +6142,10 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
- if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
return true;
*server_id= uint4korr(event_start + SERVER_ID_OFFSET);
- p= event_start + LOG_EVENT_HEADER_LEN;
+ p= event_start + fdev->common_header_len;
*seq_no= uint8korr(p);
p+= 8;
*domain_id= uint4korr(p);
@@ -6581,7 +6583,8 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
bool
Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
uint8 checksum_alg,
- rpl_gtid **out_gtid_list, uint32 *out_list_len)
+ rpl_gtid **out_gtid_list, uint32 *out_list_len,
+ const Format_description_log_event *fdev)
{
const char *p;
uint32 count_field, count;
@@ -6598,13 +6601,13 @@ Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
- if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
+ if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN)
return true;
- p= event_start + LOG_EVENT_HEADER_LEN;
+ p= event_start + fdev->common_header_len;
count_field= uint4korr(p);
p+= 4;
count= count_field & ((1<<28)-1);
- if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
+ if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN +
16 * count)
return true;
if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0),
diff --git a/sql/log_event.h b/sql/log_event.h
index abb3b96bac4..39a6ce24036 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3118,7 +3118,7 @@ public:
static bool peek(const char *event_start, size_t event_len,
uint8 checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
- uchar *flags2);
+ uchar *flags2, const Format_description_log_event *fdev);
#endif
};
@@ -3232,7 +3232,8 @@ public:
#endif
static bool peek(const char *event_start, uint32 event_len,
uint8 checksum_alg,
- rpl_gtid **out_gtid_list, uint32 *out_list_len);
+ rpl_gtid **out_gtid_list, uint32 *out_list_len,
+ const Format_description_log_event *fdev);
};
diff --git a/sql/slave.cc b/sql/slave.cc
index 29514293c78..0a2fcb38b3d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -4925,8 +4925,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
- LINT_INIT(inc_pos);
-
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
(uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
@@ -5182,7 +5180,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (Gtid_log_event::peek(buf, event_len, checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
- &event_gtid.seq_no, &dummy_flag))
+ &event_gtid.seq_no, &dummy_flag,
+ rli->relay_log.description_event_for_queue))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
@@ -5240,15 +5239,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;
}
- if (Gtid_log_event::peek(buf, event_len, checksum_alg,
- &mi->last_queued_gtid.domain_id,
- &mi->last_queued_gtid.server_id,
- &mi->last_queued_gtid.seq_no, &dummy_flag))
- {
- error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
- goto err;
- }
+ mi->last_queued_gtid= event_gtid;
++mi->events_queued_since_last_gtid;
+ inc_pos= event_len;
}
break;
@@ -5308,6 +5301,26 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (unlikely(gtid_skip_enqueue))
{
mi->master_log_pos+= inc_pos;
+ if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT &&
+ s_id == mi->master_id)
+ {
+ /*
+ If we write this master's description event in the middle of an event
+ group due to GTID reconnect, SQL thread will think that master crashed
+ in the middle of the group and roll back the first half, so we must not.
+
+ But we still have to write an artificial copy of the masters description
+ event, to override the initial slave-version description event so that
+ SQL thread has the right information for parsing the events it reads.
+ */
+ rli->relay_log.description_event_for_queue->created= 0;
+ rli->relay_log.description_event_for_queue->set_artificial_event();
+ if (rli->relay_log.append_no_lock
+ (rli->relay_log.description_event_for_queue))
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ else
+ rli->relay_log.harvest_bytes_written(&rli->log_space_total);
+ }
}
else
if ((s_id == global_system_variables.server_id &&
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 3e4939b7fa8..546a3dca98c 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1269,6 +1269,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int err;
String packet;
+ Format_description_log_event *fdev= NULL;
if (gtid_state->load((const rpl_gtid *)NULL, 0))
{
@@ -1280,6 +1281,13 @@ gtid_state_from_pos(const char *name, uint32 offset,
if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
return errormsg;
+ if (!(fdev= new Format_description_log_event(3)))
+ {
+ errormsg= "Out of memory initializing format_description event "
+ "while scanning binlog to find start position";
+ goto end;
+ }
+
/*
First we need to find the initial GTID_LIST_EVENT. We need this even
if the offset is at the very start of the binlog file.
@@ -1315,6 +1323,8 @@ gtid_state_from_pos(const char *name, uint32 offset,
typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
if (typ == FORMAT_DESCRIPTION_EVENT)
{
+ Format_description_log_event *tmp;
+
if (found_format_description_event)
{
errormsg= "Duplicate format description log event found while "
@@ -1324,6 +1334,15 @@ gtid_state_from_pos(const char *name, uint32 offset,
current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
found_format_description_event= true;
+ if (!(tmp= new Format_description_log_event(packet.ptr(), packet.length(),
+ fdev)))
+ {
+ errormsg= "Corrupt Format_description event found or out-of-memory "
+ "while searching for old-style position in binlog";
+ goto end;
+ }
+ delete fdev;
+ fdev= tmp;
}
else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
{
@@ -1348,7 +1367,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
}
status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
current_checksum_alg,
- &gtid_list, &list_len);
+ &gtid_list, &list_len, fdev);
if (status)
{
errormsg= "Error reading Gtid_list_log_event while searching "
@@ -1376,7 +1395,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
uchar flags2;
if (Gtid_log_event::peek(packet.ptr(), packet.length(),
current_checksum_alg, &gtid.domain_id,
- &gtid.server_id, &gtid.seq_no, &flags2))
+ &gtid.server_id, &gtid.seq_no, &flags2, fdev))
{
errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
"initial slave position";
@@ -1399,6 +1418,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
}
end:
+ delete fdev;
end_io_cache(&cache);
mysql_file_close(file, MYF(MY_WME));
@@ -1502,7 +1522,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
enum_gtid_until_state *gtid_until_group,
rpl_binlog_state *until_binlog_state,
bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
- bool *send_fake_gtid_list)
+ bool *send_fake_gtid_list,
+ Format_description_log_event *fdev)
{
my_off_t pos;
size_t len= packet->length();
@@ -1516,7 +1537,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
if (ev_offset > len ||
Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
- &gtid_list, &list_len))
+ &gtid_list, &list_len, fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog";
@@ -1545,7 +1566,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
- &event_gtid.seq_no, &flags2))
+ &event_gtid.seq_no, &flags2, fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog";
@@ -1881,6 +1902,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
+ Format_description_log_event *fdev= NULL;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
@@ -1956,6 +1978,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
#endif
+ if (!(fdev= new Format_description_log_event(3)))
+ {
+ errmsg= "Out of memory initializing format_description event";
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+
if (!mysql_bin_log.is_open())
{
errmsg = "Binary log is not open";
@@ -2119,6 +2148,8 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+ev_offset]));
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
+ Format_description_log_event *tmp;
+
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
@@ -2136,6 +2167,18 @@ impossible position";
"slaves that cannot process them");
goto err;
}
+
+ if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+ packet->length()-ev_offset,
+ fdev)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Corrupt Format_description event found or out-of-memory";
+ goto err;
+ }
+ delete fdev;
+ fdev= tmp;
+
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
@@ -2253,6 +2296,8 @@ impossible position";
#endif
if (event_type == FORMAT_DESCRIPTION_EVENT)
{
+ Format_description_log_event *tmp;
+
current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
@@ -2271,6 +2316,17 @@ impossible position";
goto err;
}
+ if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+ packet->length()-ev_offset,
+ fdev)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Corrupt Format_description event found or out-of-memory";
+ goto err;
+ }
+ delete fdev;
+ fdev= tmp;
+
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
@@ -2295,7 +2351,7 @@ impossible position";
until_gtid_state, &gtid_until_group,
&until_binlog_state,
slave_gtid_strict_mode, &error_gtid,
- &send_fake_gtid_list)))
+ &send_fake_gtid_list, fdev)))
{
errmsg= tmp_msg;
goto err;
@@ -2501,7 +2557,7 @@ impossible position";
&gtid_skip_group, until_gtid_state,
&gtid_until_group, &until_binlog_state,
slave_gtid_strict_mode, &error_gtid,
- &send_fake_gtid_list)))
+ &send_fake_gtid_list, fdev)))
{
errmsg= tmp_msg;
goto err;
@@ -2599,6 +2655,7 @@ end:
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
+ delete fdev;
DBUG_VOID_RETURN;
err:
@@ -2674,6 +2731,7 @@ err:
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
+ delete fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;