summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2011-05-03 14:01:11 +0200
committerunknown <knielsen@knielsen-hq.org>2011-05-03 14:01:11 +0200
commit014b8e7f435e495151fe0ef2cfd5e93d1a398ce5 (patch)
treeb432f6305fe79b126a19fa03124d28a7a2f05cbe /sql
parentd02d52629d2eda10a50079001a79d50bf3d528cd (diff)
downloadmariadb-git-014b8e7f435e495151fe0ef2cfd5e93d1a398ce5.tar.gz
Backport MySQL WL#2540 into MariaDB.
Patch backported: bzr diff '-rrevid:alfranio.correia@oracle.com-20101121143257-se3vpqus73l4mum0 ..revid:luis.soares@oracle.com-20101124111752-9b8260bd1qak87hr' --old=lp:mysql-server --new=lp:mysql-server
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc270
-rw-r--r--sql/log.h36
-rw-r--r--sql/log_event.cc597
-rw-r--r--sql/log_event.h117
-rw-r--r--sql/mysql_priv.h2
-rw-r--r--sql/mysqld.cc20
-rw-r--r--sql/repl_failsafe.cc6
-rw-r--r--sql/rpl_mi.cc3
-rw-r--r--sql/rpl_mi.h6
-rw-r--r--sql/rpl_rli.cc9
-rw-r--r--sql/rpl_utility.cc61
-rw-r--r--sql/share/errmsg.txt4
-rw-r--r--sql/slave.cc305
-rw-r--r--sql/sql_binlog.cc3
-rw-r--r--sql/sql_class.h2
-rw-r--r--sql/sql_repl.cc191
16 files changed, 1472 insertions, 160 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 22f6b5a75a4..36152be3337 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -68,6 +68,7 @@ static LEX_STRING const write_error_msg=
{ C_STRING_WITH_LEN("error writing to the binary log") };
static my_bool opt_optimize_thread_scheduling= TRUE;
+ulong binlog_checksum_options;
#ifndef DBUG_OFF
static ulong opt_binlog_dbug_fsync_sleep= 0;
#endif
@@ -2546,6 +2547,8 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG()
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
is_relay_log(0),
+ checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
+ relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
description_event_for_exec(0), description_event_for_queue(0)
{
/*
@@ -2781,10 +2784,23 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
as we won't be able to reset it later
*/
if (io_cache_type == WRITE_CACHE)
- s.flags|= LOG_EVENT_BINLOG_IN_USE_F;
+ s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
+ s.checksum_alg= is_relay_log ?
+ /* relay-log */
+ /* inherit master's A descriptor if one has been received */
+ (relay_log_checksum_alg=
+ (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
+ relay_log_checksum_alg :
+ /* otherwise use slave's local preference of RL events verification */
+ (opt_slave_sql_verify_checksum == 0) ?
+ (uint8) BINLOG_CHECKSUM_ALG_OFF : binlog_checksum_options):
+ /* binlog */
+ binlog_checksum_options;
+ DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
+ s.pre_55_writing_direct();
if (s.write(&log_file))
goto err;
bytes_written+= s.data_written;
@@ -2816,6 +2832,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Don't set log_pos in event header */
description_event_for_queue->set_artificial_event();
+ description_event_for_queue->pre_55_writing_direct();
if (description_event_for_queue->write(&log_file))
goto err;
bytes_written+= description_event_for_queue->data_written;
@@ -3955,8 +3972,16 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
We log the whole file name for log file as the user may decide
to change base names at some point.
*/
- Rotate_log_event r(new_name+dirname_length(new_name),
- 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
+ Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET,
+ is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
+ /*
+ The current relay-log's closing Rotate event must have checksum
+ value computed with an algorithm of the last relay-logged FD event.
+ */
+ if (is_relay_log)
+ r.checksum_alg= relay_log_checksum_alg;
+ r.pre_55_writing_direct();
+ DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
(error= r.write(&log_file)))
{
@@ -3977,7 +4002,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
old_name=name;
name=0; // Don't free name
close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX);
-
+ if (log_type == LOG_BIN && checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ DBUG_ASSERT(!is_relay_log);
+ DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset);
+ binlog_checksum_options= checksum_alg_reset;
+ }
/*
Note that at this point, log_state != LOG_CLOSED (important for is_open()).
*/
@@ -4056,6 +4086,7 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
Log_event::write() is smart enough to use my_b_write() or
my_b_append() depending on the kind of cache we have.
*/
+ ev->pre_55_writing_direct();
if (ev->write(&log_file))
{
error=1;
@@ -4451,6 +4482,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
{
/* Write directly to log file. */
pthread_mutex_lock(&LOCK_log);
+ pending->pre_55_writing_direct();
if (pending->write(&log_file))
{
pthread_mutex_unlock(&LOCK_log);
@@ -4493,6 +4525,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
+ uint16 cache_type;
DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
if (thd->binlog_evt_union.do_union)
@@ -4596,8 +4629,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
+ event_info->pre_55_writing_direct();
}
+ cache_type= event_info->cache_type;
+
/*
No check for auto events flag here - this write method should
never be called if auto-events are enabled
@@ -4611,7 +4647,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (with_annotate && *with_annotate)
{
DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
- Annotate_rows_log_event anno(thd);
+ Annotate_rows_log_event anno(thd, cache_type);
/* Annotate event should be written not more than once */
*with_annotate= 0;
if (anno.write(file))
@@ -4625,10 +4661,12 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
if (!thd->current_stmt_binlog_row_based)
{
+
if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
{
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
- thd->first_successful_insert_id_in_prev_stmt_for_binlog);
+ thd->first_successful_insert_id_in_prev_stmt_for_binlog,
+ cache_type);
if (e.write(file))
goto err_unlock;
}
@@ -4639,13 +4677,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
nb_elements()));
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
- minimum());
+ minimum(), cache_type);
if (e.write(file))
goto err_unlock;
}
if (thd->rand_used)
{
- Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
+ Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
+ cache_type);
if (e.write(file))
goto err_unlock;
}
@@ -4660,7 +4699,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
user_var_event->value,
user_var_event->length,
user_var_event->type,
- user_var_event->charset_number);
+ user_var_event->charset_number,
+ cache_type);
if (e.write(file))
goto err_unlock;
}
@@ -4816,6 +4856,8 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags)
#ifdef HAVE_REPLICATION
check_purge= true;
#endif
+ if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; // done
}
if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
pthread_mutex_unlock(&LOCK_log);
@@ -4844,6 +4886,33 @@ uint MYSQL_BIN_LOG::next_file_id()
}
+/**
+ Calculate checksum of possibly a part of an event containing at least
+ the whole common header.
+
+ @param buf the pointer to trans cache's buffer
+ @param off the offset of the beginning of the event in the buffer
+ @param event_len no-checksum length of the event
+ @param length the current size of the buffer
+
+ @param crc [in-out] the checksum
+
+ Event size in incremented by @c BINLOG_CHECKSUM_LEN.
+
+ @return 0 or number of unprocessed yet bytes of the event excluding
+ the checksum part.
+*/
+ static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
+ uint length, ha_checksum *crc)
+{
+ ulong ret;
+ uchar *event_begin= buf + off;
+
+ ret= length >= off + event_len ? 0 : off + event_len - length;
+ *crc= my_checksum(*crc, event_begin, event_len - ret);
+ return ret;
+}
+
/*
Write the contents of a cache to the binary log.
@@ -4855,7 +4924,11 @@ uint MYSQL_BIN_LOG::next_file_id()
DESCRIPTION
Write the contents of the cache to the binary log. The cache will
be reset as a READ_CACHE to be able to read the contents from it.
- */
+
+ Reading from the trans cache with possible (per @c binlog_checksum_options)
+ adding checksum value and then fixing the length and the end_log_pos of
+ events prior to fill in the binlog cache.
+*/
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
@@ -4863,8 +4936,17 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
+ ulong remains= 0; // part of unprocessed yet netto length of the event
long val;
+ ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
+ ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
+ my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
+ uchar buf[BINLOG_CHECKSUM_LEN];
+
+ // while there is just one alg the following must hold:
+ DBUG_ASSERT(!do_checksum ||
+ binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/*
The events in the buffer have incorrect end_log_pos data
@@ -4882,6 +4964,8 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
group= (uint)my_b_tell(&log_file);
hdr_offs= carry= 0;
+ if (do_checksum)
+ crc= crc_0= my_checksum(0L, NULL, 0);
do
{
@@ -4895,12 +4979,21 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
/* assemble both halves */
- memcpy(&header[carry], (char *)cache->read_pos, LOG_EVENT_HEADER_LEN - carry);
+ memcpy(&header[carry], (char *)cache->read_pos,
+ LOG_EVENT_HEADER_LEN - carry);
/* fix end_log_pos */
- val= uint4korr(&header[LOG_POS_OFFSET]) + group;
+ val= uint4korr(&header[LOG_POS_OFFSET]) + group +
+ (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(&header[LOG_POS_OFFSET], val);
+ if (do_checksum)
+ {
+ ulong len= uint4korr(&header[EVENT_LEN_OFFSET]);
+ /* fix len */
+ int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
+ }
+
/* write the first half of the split header */
if (my_b_write(&log_file, header, carry))
return ER_ERROR_ON_WRITE;
@@ -4910,11 +5003,20 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
copy fixed second half of header to cache so the correct
version will be written later.
*/
- memcpy((char *)cache->read_pos, &header[carry], LOG_EVENT_HEADER_LEN - carry);
+ memcpy((char *)cache->read_pos, &header[carry],
+ LOG_EVENT_HEADER_LEN - carry);
/* next event header at ... */
- hdr_offs = uint4korr(&header[EVENT_LEN_OFFSET]) - carry;
+ hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry -
+ (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
+ if (do_checksum)
+ {
+ DBUG_ASSERT(crc == crc_0 && remains == 0);
+ crc= my_checksum(crc, header, carry);
+ remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
+ BINLOG_CHECKSUM_LEN;
+ }
carry= 0;
}
@@ -4929,6 +5031,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
very next iteration, just "eventually").
*/
+ /* crc-calc the whole buffer */
+ if (do_checksum && hdr_offs >= length)
+ {
+
+ DBUG_ASSERT(remains != 0 && crc != crc_0);
+
+ crc= my_checksum(crc, cache->read_pos, length);
+ remains -= length;
+ if (my_b_write(&log_file, cache->read_pos, length))
+ return ER_ERROR_ON_WRITE;
+ if (remains == 0)
+ {
+ int4store(buf, crc);
+ if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0;
+ }
+ }
+
while (hdr_offs < length)
{
/*
@@ -4936,6 +5057,26 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
we get the rest.
*/
+ if (do_checksum)
+ {
+ if (remains != 0)
+ {
+ /*
+ finish off with remains of the last event that crawls
+ from previous into the current buffer
+ */
+ DBUG_ASSERT(crc != crc_0);
+ crc= my_checksum(crc, cache->read_pos, hdr_offs);
+ int4store(buf, crc);
+ remains -= hdr_offs;
+ DBUG_ASSERT(remains == 0);
+ if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
+ my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0;
+ }
+ }
+
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
@@ -4945,17 +5086,38 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
else
{
/* we've got a full event-header, and it came in one piece */
-
- uchar *log_pos= (uchar *)cache->read_pos + hdr_offs + LOG_POS_OFFSET;
+ uchar *ev= (uchar *)cache->read_pos + hdr_offs;
+ uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
+ uchar *log_pos= ev + LOG_POS_OFFSET;
/* fix end_log_pos */
- val= uint4korr(log_pos) + group;
+ val= uint4korr(log_pos) + group +
+ (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(log_pos, val);
+ /* fix CRC */
+ if (do_checksum)
+ {
+ /* fix length */
+ int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
+ remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
+ length, &crc);
+ if (my_b_write(&log_file, ev,
+ remains == 0 ? event_len : length - hdr_offs))
+ return ER_ERROR_ON_WRITE;
+ if (remains == 0)
+ {
+ int4store(buf, crc);
+ if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0; // crc is complete
+ }
+ }
+
/* next event header at ... */
- log_pos= (uchar *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET;
- hdr_offs += uint4korr(log_pos);
+ hdr_offs += event_len; // incr by the netto len
+ DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
}
}
@@ -4973,14 +5135,17 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
/* Write data to the binary log file */
DBUG_EXECUTE_IF("fail_binlog_write_1",
errno= 28; return ER_ERROR_ON_WRITE;);
- if (my_b_write(&log_file, cache->read_pos, length))
- return ER_ERROR_ON_WRITE;
+ if (!do_checksum)
+ if (my_b_write(&log_file, cache->read_pos, length))
+ return ER_ERROR_ON_WRITE;
status_var_add(thd->status_var.binlog_bytes_written, length);
cache->read_pos=cache->read_end; // Mark buffer used up
} while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0);
+ DBUG_ASSERT(!do_checksum || remains == 0);
+ DBUG_ASSERT(!do_checksum || crc == crc_0);
return 0; // All OK
}
@@ -5024,6 +5189,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
if (likely(is_open()))
{
+ ev.pre_55_writing_direct();
error= ev.write(&log_file);
status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
}
@@ -5414,6 +5580,7 @@ MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
binlog_trx_data *trx_data= entry->trx_data;
IO_CACHE *cache= &trx_data->trans_log;
+ entry->begin_event->pre_55_writing_direct();
if (entry->begin_event->write(&log_file))
return ER_ERROR_ON_WRITE;
status_var_add(entry->thd->status_var.binlog_bytes_written,
@@ -5433,13 +5600,18 @@ MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
if (write_cache(entry->thd, cache))
return ER_ERROR_ON_WRITE;
+ entry->end_event->pre_55_writing_direct();
if (entry->end_event->write(&log_file))
return ER_ERROR_ON_WRITE;
status_var_add(entry->thd->status_var.binlog_bytes_written,
entry->end_event->data_written);
- if (entry->incident_event && entry->incident_event->write(&log_file))
- return ER_ERROR_ON_WRITE;
+ if (entry->incident_event)
+ {
+ entry->incident_event->pre_55_writing_direct();
+ if (entry->incident_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+ }
if (cache->error) // Error on read
return ER_ERROR_ON_READ;
@@ -5503,6 +5675,12 @@ void MYSQL_BIN_LOG::close(uint exiting)
(exiting & LOG_CLOSE_STOP_EVENT))
{
Stop_log_event s;
+ // the checksumming rule for relay-log case is similar to Rotate
+ s.checksum_alg= is_relay_log ?
+ relay_log_checksum_alg : binlog_checksum_options;
+ DBUG_ASSERT(!is_relay_log ||
+ relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
+ s.pre_55_writing_direct();
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
@@ -6590,7 +6768,8 @@ int TC_LOG_BINLOG::open(const char *opt_name)
goto err;
}
- if ((ev= Log_event::read_log_event(&log, 0, &fdle)) &&
+ if ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)) &&
ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
{
@@ -6729,7 +6908,9 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error
- while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid())
+ while ((ev= Log_event::read_log_event(log, 0, fdle,
+ opt_master_verify_checksum))
+ && ev->is_valid())
{
if (ev->get_type_code() == XID_EVENT)
{
@@ -6809,6 +6990,32 @@ mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
#endif /* INNODB_COMPATIBILITY_HOOKS */
+static void
+binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong value= *((ulong *)var_ptr);
+
+ pthread_mutex_lock(mysql_bin_log.get_log_lock());
+ if(mysql_bin_log.is_open())
+ {
+ uint flags= RP_FORCE_ROTATE | RP_LOCK_LOG_IS_ALREADY_LOCKED |
+ (binlog_checksum_options != (uint) value?
+ RP_BINLOG_CHECKSUM_ALG_CHANGE : 0);
+ if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ mysql_bin_log.checksum_alg_reset= (uint8) value;
+ mysql_bin_log.rotate_and_purge(flags);
+ }
+ else
+ {
+ binlog_checksum_options= value;
+ }
+ DBUG_ASSERT((ulong) binlog_checksum_options == value);
+ DBUG_ASSERT(mysql_bin_log.checksum_alg_reset == BINLOG_CHECKSUM_ALG_UNDEF);
+ pthread_mutex_unlock(mysql_bin_log.get_log_lock());
+}
+
+
static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff)
{
mysql_bin_log.set_status_variables(thd);
@@ -6835,6 +7042,18 @@ static MYSQL_SYSVAR_BOOL(
NULL,
1);
+static MYSQL_SYSVAR_ENUM(
+ checksum,
+ binlog_checksum_options,
+ PLUGIN_VAR_RQCMDARG,
+ "Type of BINLOG_CHECKSUM_ALG. Include checksum for "
+ "log events in the binary log. Possible values are NONE and CRC32; "
+ "default is NONE.",
+ NULL,
+ binlog_checksum_update,
+ BINLOG_CHECKSUM_ALG_OFF,
+ &binlog_checksum_typelib);
+
#ifndef DBUG_OFF
static MYSQL_SYSVAR_ULONG(
dbug_fsync_sleep,
@@ -6852,6 +7071,7 @@ static MYSQL_SYSVAR_ULONG(
static struct st_mysql_sys_var *binlog_sys_vars[]=
{
MYSQL_SYSVAR(optimize_thread_scheduling),
+ MYSQL_SYSVAR(checksum),
#ifndef DBUG_OFF
MYSQL_SYSVAR(dbug_fsync_sleep),
#endif
diff --git a/sql/log.h b/sql/log.h
index 503e94df981..6437046b17b 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -403,7 +403,41 @@ public:
/* This is relay log */
bool is_relay_log;
-
+ uint8 checksum_alg_reset; // to contain a new value when binlog is rotated
+ /*
+ Holds the last seen in Relay-Log FD's checksum alg value.
+ The initial value comes from the slave's local FD that heads
+ the very first Relay-Log file. In the following the value may change
+ with each received master's FD_m.
+ Besides to be used in verification events that IO thread receives
+ (except the 1st fake Rotate, see @c Master_info:: checksum_alg_before_fd),
+ the value specifies if/how to compute checksum for slave's local events
+ and the first fake Rotate (R_f^1) coming from the master.
+ R_f^1 needs logging checksum-compatibly with the RL's heading FD_s.
+
+ Legends for the checksum related comments:
+
+ FD - Format-Description event,
+ R - Rotate event
+ R_f - the fake Rotate event
+ E - an arbirary event
+
+ The underscore indexes for any event
+ `_s' indicates the event is generated by Slave
+ `_m' - by Master
+
+ Two special underscore indexes of FD:
+ FD_q - Format Description event for queuing (relay-logging)
+ FD_e - Format Description event for executing (relay-logging)
+
+ Upper indexes:
+ E^n - n:th event is a sequence
+
+ RL - Relay Log
+ (A) - checksum algorithm descriptor value
+ FD.(A) - the value of (A) in FD
+ */
+ uint8 relay_log_checksum_alg;
/*
These describe the log's format. This is used only for relay logs.
_for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 0259ff762ad..b31c991b9f3 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -38,6 +38,31 @@
#include <base64.h>
#include <my_bitmap.h>
+
+/**
+ BINLOG_CHECKSUM variable.
+*/
+const char *binlog_checksum_type_names[]= {
+ "NONE",
+ "CRC32",
+ NullS
+};
+
+unsigned int binlog_checksum_type_length[]= {
+ sizeof("NONE") - 1,
+ sizeof("CRC32") - 1,
+ 0
+};
+
+TYPELIB binlog_checksum_typelib=
+{
+ array_elements(binlog_checksum_type_names) - 1, "",
+ binlog_checksum_type_names,
+ binlog_checksum_type_length
+};
+
+
+
#define log_cs &my_charset_latin1
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -51,6 +76,24 @@
*/
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
+/*
+ replication event checksum is introduced in the following "checksum-home" version.
+ The checksum-aware servers extract FD's version to decide whether the FD event
+ carries checksum info.
+
+ TODO: correct the constant when it has been determined
+ (which main tree to push and when)
+*/
+const uchar checksum_version_split_mysql[3]= {5, 6, 1};
+const ulong checksum_version_product_mysql=
+ (checksum_version_split_mysql[0] * 256 +
+ checksum_version_split_mysql[1]) * 256 +
+ checksum_version_split_mysql[2];
+const uchar checksum_version_split_mariadb[3]= {5, 2, 5};
+const ulong checksum_version_product_mariadb=
+ (checksum_version_split_mariadb[0] * 256 +
+ checksum_version_split_mariadb[1]) * 256 +
+ checksum_version_split_mariadb[2];
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd);
@@ -610,7 +653,6 @@ static void print_set_option(IO_CACHE* file, uint32 bits_changed,
}
}
#endif
-
/**************************************************************************
Log_event methods (= the parent class of all events)
**************************************************************************/
@@ -666,7 +708,9 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
+ :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg),
+ cache_type(EVENT_INVALID_CACHE), crc(0), thd(thd_arg),
+ checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= thd->server_id;
when= thd->start_time;
@@ -683,7 +727,8 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
Log_event::Log_event()
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
- thd(0)
+ cache_type(EVENT_INVALID_CACHE), crc(0),
+ thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= ::server_id;
/*
@@ -702,7 +747,8 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
- :temp_buf(0), cache_stmt(0)
+ :temp_buf(0), cache_stmt(0), cache_type(EVENT_INVALID_CACHE),
+ crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -887,6 +933,106 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Info", 20));
}
+/**
+ A decider of whether to trigger checksum computation or not.
+ To be invoked in Log_event::write() stack.
+ The decision is positive
+
+ S,M) if it's been marked for checksumming with @c checksum_alg
+
+ M) otherwise, if @@global.binlog_checksum is not NONE and the event is
+ directly written to the binlog file.
+ The to-be-cached event decides at @c write_cache() time.
+
+ Otherwise the decision is negative.
+
+ @note A side effect of the method is altering Log_event::checksum_alg
+ it the latter was undefined at calling.
+
+ @return true (positive) or false (negative)
+*/
+my_bool Log_event::need_checksum()
+{
+ DBUG_ENTER("Log_event::need_checksum");
+ my_bool ret;
+ extern ulong binlog_checksum_options;
+ /*
+ few callers of Log_event::write
+ (incl FD::write, FD constructing code on the slave side, Rotate relay log
+ and Stop event)
+ provides their checksum alg preference through Log_event::checksum_alg.
+ */
+ ret= (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
+ (checksum_alg != BINLOG_CHECKSUM_ALG_OFF) :
+ ((binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF) &&
+ (cache_type == Log_event::EVENT_NO_CACHE))? binlog_checksum_options :
+ FALSE;
+
+ /*
+ FD calls the methods before data_written has been calculated.
+ The following invariant claims if the current is not the first
+ call (and therefore data_written is not zero) then `ret' must be
+ TRUE. It may not be null because FD is always checksummed.
+ */
+
+ DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
+ data_written == 0);
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
+ checksum_alg= ret ? // calculated value stored
+ binlog_checksum_options : (uint8) BINLOG_CHECKSUM_ALG_OFF;
+
+ DBUG_ASSERT(!ret ||
+ ((checksum_alg == binlog_checksum_options ||
+ /*
+ Stop event closes the relay-log and its checksum alg
+ preference is set by the caller can be different
+ from the server's binlog_checksum_options.
+ */
+ get_type_code() == STOP_EVENT ||
+ /*
+ Rotate:s can be checksummed regardless of the server's
+ binlog_checksum_options. That applies to both
+ the local RL's Rotate and the master's Rotate
+ which IO thread instantiates via queue_binlog_ver_3_event.
+ */
+ get_type_code() == ROTATE_EVENT
+ || /* FD is always checksummed */
+ get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
+ checksum_alg != BINLOG_CHECKSUM_ALG_OFF));
+
+ DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
+
+ DBUG_ASSERT(((get_type_code() != ROTATE_EVENT &&
+ get_type_code() != STOP_EVENT) ||
+ get_type_code() != FORMAT_DESCRIPTION_EVENT) ||
+ cache_type == Log_event::EVENT_NO_CACHE);
+
+ DBUG_RETURN(ret);
+}
+
+bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size)
+{
+ if (need_checksum() && size != 0)
+ crc= my_checksum(crc, buf, size);
+
+ return my_b_safe_write(file, buf, size);
+}
+
+bool Log_event::write_footer(IO_CACHE* file)
+{
+ /*
+ footer contains the checksum-algorithm descriptor
+ followed by the checksum value
+ */
+ if (need_checksum())
+ {
+ uchar buf[BINLOG_CHECKSUM_LEN];
+ int4store(buf, crc);
+ return (my_b_safe_write(file, (uchar*) buf, sizeof(buf)));
+ }
+ return 0;
+}
/*
Log_event::write()
@@ -896,11 +1042,18 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
ulong now;
+ bool ret;
DBUG_ENTER("Log_event::write_header");
/* Store number of bytes that will be written by this event */
data_written= event_data_length + sizeof(header);
+ if (need_checksum())
+ {
+ crc= my_checksum(0L, NULL, 0);
+ data_written += BINLOG_CHECKSUM_LEN;
+ }
+
/*
log_pos != 0 if this is relay-log event. In this case we should not
change the position
@@ -959,9 +1112,36 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written);
int4store(header+ LOG_POS_OFFSET, log_pos);
- int2store(header+ FLAGS_OFFSET, flags);
-
- DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0);
+ /*
+ recording checksum of FD event computed with dropped
+ possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
+ Similar step at verication: the active flag is dropped before
+ checksum computing.
+ */
+ if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT ||
+ !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F))
+ {
+ int2store(header+ FLAGS_OFFSET, flags);
+ ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0;
+ }
+ else
+ {
+ ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0);
+ if (!ret)
+ {
+ flags &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ int2store(header + FLAGS_OFFSET, flags);
+ crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags));
+ flags |= LOG_EVENT_BINLOG_IN_USE_F;
+ int2store(header + FLAGS_OFFSET, flags);
+ ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0);
+ }
+ if (!ret)
+ ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags),
+ sizeof(header)
+ - (FLAGS_OFFSET + sizeof(flags))) != 0);
+ }
+ DBUG_RETURN( ret);
}
@@ -971,11 +1151,13 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
*/
int Log_event::read_log_event(IO_CACHE* file, String* packet,
- pthread_mutex_t* log_lock)
+ pthread_mutex_t* log_lock,
+ uint8 checksum_alg_arg)
{
ulong data_len;
int result=0;
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
+ uchar ev_offset= packet->length();
DBUG_ENTER("Log_event::read_log_event");
if (log_lock)
@@ -1033,6 +1215,31 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
/* Implicit goto end; */
}
+ else
+ {
+ /* Corrupt the event for Dump thread*/
+ DBUG_EXECUTE_IF("corrupt_read_log_event2",
+ uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
+ if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ {
+ int debug_cor_pos = rand() % (data_len + sizeof(buf) - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event2");
+ }
+ );
+ /*
+ CRC verification of the Dump thread
+ */
+ if (opt_master_verify_checksum &&
+ event_checksum_test((uchar*) packet->ptr() + ev_offset,
+ data_len + sizeof(buf),
+ checksum_alg_arg))
+ {
+ result= LOG_READ_CHECKSUM_FAILURE;
+ goto end;
+ }
+ }
}
end:
@@ -1058,11 +1265,13 @@ end:
Log_event* Log_event::read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
const Format_description_log_event
- *description_event)
+ *description_event,
+ my_bool crc_check)
#else
Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event
- *description_event)
+ *description_event,
+ my_bool crc_check)
#endif
{
DBUG_ENTER("Log_event::read_log_event");
@@ -1126,7 +1335,7 @@ failed my_b_read"));
error = "read error";
goto err;
}
- if ((res= read_log_event(buf, data_len, &error, description_event)))
+ if ((res= read_log_event(buf, data_len, &error, description_event, crc_check)))
res->register_temp_buf(buf, TRUE);
err:
@@ -1159,9 +1368,11 @@ err:
Log_event* Log_event::read_log_event(const char* buf, uint event_len,
const char **error,
- const Format_description_log_event *description_event)
+ const Format_description_log_event *description_event,
+ my_bool crc_check)
{
Log_event* ev;
+ uint8 alg;
DBUG_ENTER("Log_event::read_log_event(char*,...)");
DBUG_ASSERT(description_event != 0);
DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
@@ -1177,6 +1388,60 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
}
uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
+ // all following START events in the current file are without checksum
+ if (event_type == START_EVENT_V3)
+ (const_cast< Format_description_log_event *>(description_event))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ /*
+ CRC verification by SQL and Show-Binlog-Events master side.
+ The caller has to provide @description_event->checksum_alg to
+ be the last seen FD's (A) descriptor.
+ If event is FD the descriptor is in it.
+ Notice, FD of the binlog can be only in one instance and therefore
+ Show-Binlog-Events executing master side thread needs just to know
+ the only FD's (A) value - whereas RL can contain more.
+ In the RL case, the alg is kept in FD_e (@description_event) which is reset
+ to the newer read-out event after its execution with possibly new alg descriptor.
+ Therefore in a typical sequence of RL:
+ {FD_s^0, FD_m, E_m^1} E_m^1
+ will be verified with (A) of FD_m.
+
+ See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg docs
+ lines (log.h).
+
+ Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF.
+ */
+ alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
+ description_event->checksum_alg : get_checksum_alg(buf, event_len);
+ // Emulate the corruption during reading an event
+ DBUG_EXECUTE_IF("corrupt_read_log_event_char",
+ if (event_type != FORMAT_DESCRIPTION_EVENT)
+ {
+ char *debug_event_buf_c = (char *)buf;
+ int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event(char*,...): byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event_char");
+ }
+ );
+ if (crc_check &&
+ event_checksum_test((uchar *) buf, event_len, alg))
+ {
+#ifdef MYSQL_CLIENT
+ *error= "Event crc check failed! Most likely there is event corruption.";
+ if (force_opt)
+ {
+ ev= new Unknown_log_event(buf, description_event);
+ DBUG_RETURN(ev);
+ }
+ else
+ DBUG_RETURN(NULL);
+#else
+ *error= ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE);
+ sql_print_error("%s", ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE));
+ DBUG_RETURN(NULL);
+#endif
+ }
+
if (event_type > description_event->number_of_event_types &&
event_type != FORMAT_DESCRIPTION_EVENT)
{
@@ -1215,6 +1480,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
event_type= description_event->event_type_permutation[event_type];
}
+ if (alg != BINLOG_CHECKSUM_ALG_UNDEF &&
+ (event_type == FORMAT_DESCRIPTION_EVENT ||
+ alg != BINLOG_CHECKSUM_ALG_OFF))
+ event_len= event_len - BINLOG_CHECKSUM_LEN;
+
switch(event_type) {
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
@@ -1309,6 +1579,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
}
}
+ if (ev)
+ {
+ ev->checksum_alg= alg;
+ if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ ev->crc= uint4korr(buf + (event_len));
+ }
+
DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)",
ev ? ev->get_type_str() : "<unknown>",
buf[EVENT_TYPE_OFFSET],
@@ -1363,6 +1641,18 @@ void Log_event::print_header(IO_CACHE* file,
my_b_printf(file, " server id %lu end_log_pos %s ", (ulong) server_id,
llstr(log_pos,llbuff));
+ /* print the checksum */
+
+ if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "0x%lx "
+ size_t const bytes_written=
+ my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08lx ", (ulong) crc);
+ my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib, checksum_alg));
+ my_b_printf(file, checksum_buf, bytes_written);
+ }
+
/* mysqlbinlog --hexdump */
if (print_event_info->hexdump_from)
{
@@ -2028,6 +2318,9 @@ void Log_event::print_base64(IO_CACHE* file,
if (print_event_info->verbose)
{
Rows_log_event *ev= NULL;
+ if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF &&
+ checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
+ size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header
if (ptr[4] == TABLE_MAP_EVENT)
{
@@ -2390,12 +2683,13 @@ bool Query_log_event::write(IO_CACHE* file)
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
write_post_header_for_derived(file) ||
- my_b_safe_write(file, (uchar*) start_of_status,
+ wrapper_my_b_safe_write(file, (uchar*) start_of_status,
(uint) (start-start_of_status)) ||
- my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
- my_b_safe_write(file, (uchar*) query, q_len)) ? 1 : 0;
+ wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
+ wrapper_my_b_safe_write(file, (uchar*) query, q_len) ||
+ write_footer(file)) ? 1 : 0;
}
/**
@@ -3623,7 +3917,8 @@ bool Start_log_event_v3::write(IO_CACHE* file)
created= when= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
return (write_header(file, sizeof(buff)) ||
- my_b_safe_write(file, (uchar*) buff, sizeof(buff)));
+ wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) ||
+ write_footer(file));
}
#endif
@@ -3725,6 +4020,7 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
old 4.0 (binlog version 2) is not supported;
it should not be used for replication with
5.0.
+ @param server_ver a string containing the server version.
*/
Format_description_log_event::
@@ -3740,9 +4036,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
common_header_len= LOG_EVENT_HEADER_LEN;
number_of_event_types= LOG_EVENT_TYPES;
/* we'll catch my_malloc() error in is_valid() */
- post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8),
+ post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8)
+ + BINLOG_CHECKSUM_ALG_DESC_LEN,
MYF(0));
-
/*
This long list of assignments is not beautiful, but I see no way to
make it nicer, as the right members are #defines, not array members, so
@@ -3866,6 +4162,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
break;
}
calc_server_version_split();
+ checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
}
@@ -3900,14 +4197,26 @@ Format_description_log_event(const char* buf,
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
DBUG_VOID_RETURN; /* sanity check */
number_of_event_types=
- event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
+ event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET + 1);
DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d",
common_header_len, number_of_event_types));
/* If alloc fails, we'll detect it in is_valid() */
+
post_header_len= (uint8*) my_memdup((uchar*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
number_of_event_types*
- sizeof(*post_header_len), MYF(0));
+ sizeof(*post_header_len),
+ MYF(0));
calc_server_version_split();
+ if (!is_version_before_checksum(&server_version_split))
+ {
+ /* the last bytes are the checksum alg desc and value (or value's room) */
+ number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
+ checksum_alg= post_header_len[number_of_event_types];
+ }
+ else
+ {
+ checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
+ }
/*
In some previous versions, the events were given other event type
@@ -4018,21 +4327,59 @@ Format_description_log_event(const char* buf,
#ifndef MYSQL_CLIENT
bool Format_description_log_event::write(IO_CACHE* file)
{
+ bool ret;
+ bool no_checksum;
/*
We don't call Start_log_event_v3::write() because this would make 2
my_b_safe_write().
*/
- uchar buff[FORMAT_DESCRIPTION_HEADER_LEN];
+ uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN];
+ size_t rec_size= sizeof(buff);
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
if (!dont_set_created)
created= when= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
- memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (uchar*) post_header_len,
+ memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len,
LOG_EVENT_TYPES);
- return (write_header(file, sizeof(buff)) ||
- my_b_safe_write(file, buff, sizeof(buff)));
+ /*
+ if checksum is requested
+ record the checksum-algorithm descriptor next to
+ post_header_len vector which will be followed by the checksum value.
+ Master is supposed to trigger checksum computing by binlog_checksum_options,
+ slave does it via marking the event according to
+ FD_queue checksum_alg value.
+ */
+ compile_time_assert(sizeof(BINLOG_CHECKSUM_ALG_DESC_LEN == 1));
+#ifndef DBUG_OFF
+ data_written= 0; // to prepare for need_checksum assert
+#endif
+ buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ?
+ checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
+ /*
+ FD of checksum-aware server is always checksum-equipped, (V) is in,
+ regardless of @@global.binlog_checksum policy.
+ Thereby a combination of (A) == 0, (V) != 0 means
+ it's the checksum-aware server's FD event that heads checksum-free binlog
+ file.
+ Here 0 stands for checksumming OFF to evaluate (V) as 0 is that case.
+ A combination of (A) != 0, (V) != 0 denotes FD of the checksum-aware server
+ heading the checksummed binlog.
+ (A), (V) presence in FD of the checksum-aware server makes the event
+ 1 + 4 bytes bigger comparing to the former FD.
+ */
+
+ if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF)))
+ {
+ checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
+ }
+ ret= (write_header(file, rec_size) ||
+ wrapper_my_b_safe_write(file, buff, rec_size) ||
+ write_footer(file));
+ if (no_checksum)
+ checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ return ret;
}
#endif
@@ -4128,6 +4475,30 @@ Format_description_log_event::do_shall_skip(Relay_log_info *rli)
#endif
+static inline void
+do_server_version_split(char* version,
+ Format_description_log_event::master_version_split *split_versions)
+{
+ char *p= version, *r;
+ ulong number;
+ for (uint i= 0; i<=2; i++)
+ {
+ number= strtoul(p, &r, 10);
+ split_versions->ver[i]= (uchar) number;
+ DBUG_ASSERT(number < 256); // fit in uchar
+ p= r;
+ DBUG_ASSERT(!((i == 0) && (*r != '.'))); // should be true in practice
+ if (*r == '.')
+ p++; // skip the dot
+ }
+ if (strinstr(p, "MariaDB") != 0 || strinstr(p, "-maria-") != 0)
+ split_versions->kind=
+ Format_description_log_event::master_version_split::KIND_MARIADB;
+ else
+ split_versions->kind=
+ Format_description_log_event::master_version_split::KIND_MYSQL;
+}
+
/**
Splits the event's 'server_version' string into three numeric pieces stored
@@ -4140,24 +4511,67 @@ Format_description_log_event::do_shall_skip(Relay_log_info *rli)
*/
void Format_description_log_event::calc_server_version_split()
{
- char *p= server_version, *r;
- ulong number;
- for (uint i= 0; i<=2; i++)
- {
- number= strtoul(p, &r, 10);
- server_version_split[i]= (uchar)number;
- DBUG_ASSERT(number < 256); // fit in uchar
- p= r;
- DBUG_ASSERT(!((i == 0) && (*r != '.'))); // should be true in practice
- if (*r == '.')
- p++; // skip the dot
- }
+ do_server_version_split(server_version, &server_version_split);
+
DBUG_PRINT("info",("Format_description_log_event::server_version_split:"
" '%s' %d %d %d", server_version,
- server_version_split[0],
- server_version_split[1], server_version_split[2]));
+ server_version_split.ver[0],
+ server_version_split.ver[1], server_version_split.ver[2]));
+}
+
+static inline ulong
+version_product(const Format_description_log_event::master_version_split* version_split)
+{
+ return ((version_split->ver[0] * 256 + version_split->ver[1]) * 256
+ + version_split->ver[2]);
}
+/**
+ @return TRUE is the event's version is earlier than one that introduced
+ the replication event checksum. FALSE otherwise.
+*/
+bool
+Format_description_log_event::is_version_before_checksum(master_version_split
+ *version_split)
+{
+ return version_product(version_split) <
+ (version_split->kind == master_version_split::KIND_MARIADB ?
+ checksum_version_product_mariadb : checksum_version_product_mysql);
+}
+
+/**
+ @param buf buffer holding serialized FD event
+ @param len netto (possible checksum is stripped off) length of the event buf
+
+ @return the version-safe checksum alg descriptor where zero
+ designates no checksum, 255 - the orginator is
+ checksum-unaware (effectively no checksum) and the actuall
+ [1-254] range alg descriptor.
+*/
+uint8 get_checksum_alg(const char* buf, ulong len)
+{
+ uint8 ret;
+ char version[ST_SERVER_VER_LEN];
+ Format_description_log_event::master_version_split version_split;
+
+ DBUG_ENTER("get_checksum_alg");
+ DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
+
+ memcpy(version, buf +
+ buf[LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET]
+ + ST_SERVER_VER_OFFSET, ST_SERVER_VER_LEN);
+ version[ST_SERVER_VER_LEN - 1]= 0;
+
+ do_server_version_split(version, &version_split);
+ ret= Format_description_log_event::is_version_before_checksum(&version_split) ?
+ (uint8) BINLOG_CHECKSUM_ALG_UNDEF :
+ * (uint8*) (buf + len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN);
+ DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF ||
+ ret == BINLOG_CHECKSUM_ALG_UNDEF ||
+ ret == BINLOG_CHECKSUM_ALG_CRC32);
+ DBUG_RETURN(ret);
+}
+
/**************************************************************************
Load_log_event methods
@@ -5001,6 +5415,7 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
DBUG_PRINT("enter",("new_log_ident: %s pos: %s flags: %lu", new_log_ident_arg,
llstr(pos_arg, buff), (ulong) flags));
#endif
+ cache_type= EVENT_NO_CACHE;
if (flags & DUP_NAME)
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
if (flags & RELAY_LOG)
@@ -5042,9 +5457,11 @@ bool Rotate_log_event::write(IO_CACHE* file)
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
- return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
- my_b_safe_write(file, (uchar*)buf, ROTATE_HEADER_LEN) ||
- my_b_safe_write(file, (uchar*)new_log_ident, (uint) ident_len));
+ return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) new_log_ident,
+ (uint) ident_len) ||
+ write_footer(file));
}
#endif
@@ -5213,7 +5630,8 @@ bool Intvar_log_event::write(IO_CACHE* file)
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -5341,7 +5759,8 @@ bool Rand_log_event::write(IO_CACHE* file)
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -5443,8 +5862,9 @@ Xid_log_event(const char* buf,
bool Xid_log_event::write(IO_CACHE* file)
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
- return write_header(file, sizeof(xid)) ||
- my_b_safe_write(file, (uchar*) &xid, sizeof(xid));
+ return (write_header(file, sizeof(xid)) ||
+ wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) ||
+ write_footer(file));
}
#endif
@@ -5663,10 +6083,11 @@ bool User_var_log_event::write(IO_CACHE* file)
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
- my_b_safe_write(file, (uchar*) name, name_len) ||
- my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- my_b_safe_write(file, pos, val_len));
+ wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
+ wrapper_my_b_safe_write(file, (uchar*) name, name_len) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
+ wrapper_my_b_safe_write(file, pos, val_len) ||
+ write_footer(file));
}
#endif
@@ -6424,8 +6845,9 @@ bool Append_block_log_event::write(IO_CACHE* file)
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
- my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
- my_b_safe_write(file, (uchar*) block, block_len));
+ wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) block, block_len) ||
+ write_footer(file));
}
#endif
@@ -6579,7 +7001,8 @@ bool Delete_file_log_event::write(IO_CACHE* file)
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -6676,7 +7099,8 @@ bool Execute_load_log_event::write(IO_CACHE* file)
uchar buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -6737,16 +7161,17 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
fname);
goto err;
}
- if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
- (pthread_mutex_t*)0,
- rli->relay_log.description_event_for_exec)) ||
+ if (!(lev= (Load_log_event*)
+ Log_event::read_log_event(&file,
+ (pthread_mutex_t*)0,
+ rli->relay_log.description_event_for_exec,
+ opt_slave_sql_verify_checksum)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
rli->report(ERROR_LEVEL, 0, "Error in Exec_load event: "
"file '%s' appears corrupted", fname);
goto err;
}
-
lev->thd = thd;
/*
lev->do_apply_event should use rli only for errors i.e. should
@@ -6909,7 +7334,7 @@ Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
- return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+ return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
#endif
@@ -7876,11 +8301,11 @@ bool Rows_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return (wrapper_my_b_safe_write(file, buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+ return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN));
}
bool Rows_log_event::write_data_body(IO_CACHE*file)
@@ -7896,10 +8321,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
- res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
+ res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap,
no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
@@ -7908,11 +8333,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
{
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
- res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
+ res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size);
return res;
@@ -7961,13 +8386,15 @@ void Rows_log_event::print_helper(FILE *file,
**************************************************************************/
#ifndef MYSQL_CLIENT
-Annotate_rows_log_event::Annotate_rows_log_event(THD *thd)
+Annotate_rows_log_event::Annotate_rows_log_event(THD *thd,
+ uint16 cache_type_arg)
: Log_event(thd, 0, true),
m_save_thd_query_txt(0),
m_save_thd_query_len(0)
{
m_query_txt= thd->query();
m_query_len= thd->query_length();
+ cache_type= cache_type_arg;
}
#endif
@@ -8015,7 +8442,7 @@ bool Annotate_rows_log_event::write_data_header(IO_CACHE *file)
#ifndef MYSQL_CLIENT
bool Annotate_rows_log_event::write_data_body(IO_CACHE *file)
{
- return my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
+ return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
}
#endif
@@ -8590,11 +9017,11 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return (wrapper_my_b_safe_write(file, buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+ return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
}
bool Table_map_log_event::write_data_body(IO_CACHE *file)
@@ -8618,15 +9045,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
uchar mbuf[sizeof(m_field_metadata_size)];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
- my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
- my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
- my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
- my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
- my_b_safe_write(file, m_coltype, m_colcnt) ||
- my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
- my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
- my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
+ wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
+ wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
+ wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
+ wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
+ wrapper_my_b_safe_write(file, m_coltype, m_colcnt) ||
+ wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
+ wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
+ wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
}
#endif
@@ -9966,13 +10393,25 @@ Incident_log_event::write_data_header(IO_CACHE *file)
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
- DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
+#ifndef MYSQL_CLIENT
+ DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf)));
+#else
+ DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
+#endif
}
bool
Incident_log_event::write_data_body(IO_CACHE *file)
{
+ uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body");
+ tmp[0]= (uchar) m_message.length;
+ crc= my_checksum(crc, (uchar*) tmp, 1);
+ if (m_message.length > 0)
+ {
+ crc= my_checksum(crc, (uchar*) m_message.str, m_message.length);
+ // todo: report a bug on write_str accepts uint but treats it as uchar
+ }
DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length));
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 91a2ad9693c..53fc8f986b4 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -72,6 +72,7 @@
#define LOG_READ_MEM -5
#define LOG_READ_TRUNC -6
#define LOG_READ_TOO_LARGE -7
+#define LOG_READ_CHECKSUM_FAILURE -8
#define LOG_EVENT_OFFSET 4
@@ -525,6 +526,22 @@ struct sql_ex_info
#endif
#undef EXPECTED_OPTIONS /* You shouldn't use this one */
+enum enum_binlog_checksum_alg {
+ BINLOG_CHECKSUM_ALG_OFF= 0, // Events are without checksum though its generator
+ // is checksum-capable New Master (NM).
+ BINLOG_CHECKSUM_ALG_CRC32= 1, // CRC32 of zlib algorithm.
+ BINLOG_CHECKSUM_ALG_ENUM_END, // the cut line: valid alg range is [1, 0x7f].
+ BINLOG_CHECKSUM_ALG_UNDEF= 255 // special value to tag undetermined yet checksum
+ // or events from checksum-unaware servers
+};
+
+#define CHECKSUM_CRC32_SIGNATURE_LEN 4
+/**
+ defined statically while there is just one alg implemented
+*/
+#define BINLOG_CHECKSUM_LEN CHECKSUM_CRC32_SIGNATURE_LEN
+#define BINLOG_CHECKSUM_ALG_DESC_LEN 1 /* 1 byte checksum alg descriptor */
+
/**
@enum Log_event_type
@@ -923,6 +940,27 @@ public:
uint16 flags;
bool cache_stmt;
+ /*
+ The revid:alfranio.correia@sun.com-20091103190256-637o8qxlveikrt3i commit
+ ("WL#2687 WL#5072 BUG#40278 BUG#47175") in MySQL 5.5 changes the bool
+ cache_stmt into an enum cache_type. For the backport of WL#2540 binlog
+ event checksum, we need this event_type member to know if we are writing
+ directly to the log, or into a transaction cache.
+
+ Until the cache_type stuff is merged, we temporarily partially backport
+ the cache_type member, only enough to be able to check if we are writing
+ directly to log or not. Once MySQL 5.5 is merged, this can be removed, and
+ replaced with the MySQL 5.5 code.
+
+ Similarly, in MySQL 5.5 the decision on whether to write directly to log
+ or indirectly through cache is decided differently, and the
+ pre_55_writing_direct() (and all calls to it) are not needed and can be
+ removed once 5.5 is merged.
+ */
+ enum enum_event_cache_type { EVENT_INVALID_CACHE, EVENT_STMT_CACHE,
+ EVENT_TRANSACTIONAL_CACHE, EVENT_NO_CACHE };
+ uint16 cache_type;
+ void pre_55_writing_direct() { cache_type= EVENT_NO_CACHE; }
/**
A storage to cache the global system variable's value.
@@ -930,6 +968,10 @@ public:
*/
ulong slave_exec_mode;
+ /**
+ Placeholder for event checksum while writing to binlog.
+ */
+ ha_checksum crc;
#ifndef MYSQL_CLIENT
THD* thd;
@@ -949,9 +991,10 @@ public:
static Log_event* read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
const Format_description_log_event
- *description_event);
+ *description_event,
+ my_bool crc_check);
static int read_log_event(IO_CACHE* file, String* packet,
- pthread_mutex_t* log_lock);
+ pthread_mutex_t* log_lock, uint8 checksum_alg_arg);
/*
init_show_field_list() prepares the column names and types for the
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
@@ -978,7 +1021,7 @@ public:
/* avoid having to link mysqlbinlog against libpthread */
static Log_event* read_log_event(IO_CACHE* file,
const Format_description_log_event
- *description_event);
+ *description_event, my_bool crc_check);
/* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
void print_timestamp(IO_CACHE* file, time_t *ts = 0);
@@ -987,6 +1030,15 @@ public:
void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
bool is_more);
#endif
+ /*
+ The value is set by caller of FD constructor and
+ Log_event::write_header() for the rest.
+ In the FD case it's propagated into the last byte
+ of post_header_len[] at FD::write().
+ On the slave side the value is assigned from post_header_len[last]
+ of the last seen FD event.
+ */
+ uint8 checksum_alg;
static void *operator new(size_t size)
{
@@ -1001,14 +1053,19 @@ public:
/* Placement version of the above operators */
static void *operator new(size_t, void* ptr) { return ptr; }
static void operator delete(void*, void*) { }
+ bool wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong data_length);
#ifndef MYSQL_CLIENT
bool write_header(IO_CACHE* file, ulong data_length);
+ bool write_footer(IO_CACHE* file);
+ my_bool need_checksum();
+
virtual bool write(IO_CACHE* file)
{
- return (write_header(file, get_data_size()) ||
- write_data_header(file) ||
- write_data_body(file));
+ return(write_header(file, get_data_size()) ||
+ write_data_header(file) ||
+ write_data_body(file) ||
+ write_footer(file));
}
virtual bool write_data_header(IO_CACHE* file)
{ return 0; }
@@ -1058,7 +1115,7 @@ public:
static Log_event* read_log_event(const char* buf, uint event_len,
const char **error,
const Format_description_log_event
- *description_event);
+ *description_event, my_bool crc_check);
/**
Returns the human readable name of the given event type.
*/
@@ -2237,9 +2294,17 @@ public:
*/
uint8 common_header_len;
uint8 number_of_event_types;
- /* The list of post-headers' lengthes */
+ /*
+ The list of post-headers' lengths followed
+ by the checksum alg decription byte
+ */
uint8 *post_header_len;
- uchar server_version_split[3];
+ struct master_version_split {
+ enum {KIND_MYSQL, KIND_MARIADB};
+ int kind;
+ uchar ver[3];
+ };
+ master_version_split server_version_split;
const uint8 *event_type_permutation;
Format_description_log_event(uint8 binlog_ver, const char* server_ver=0);
@@ -2271,7 +2336,7 @@ public:
}
void calc_server_version_split();
-
+ static bool is_version_before_checksum(master_version_split *version_split);
protected:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
@@ -2326,9 +2391,10 @@ public:
uchar type;
#ifndef MYSQL_CLIENT
- Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg)
- :Log_event(thd_arg,0,0),val(val_arg),type(type_arg)
- {}
+ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
+ uint16 cache_type_arg)
+ :Log_event(thd_arg,0,0), val(val_arg), type(type_arg)
+ { cache_type= cache_type_arg; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
@@ -2402,9 +2468,10 @@ class Rand_log_event: public Log_event
ulonglong seed2;
#ifndef MYSQL_CLIENT
- Rand_log_event(THD* thd_arg, ulonglong seed1_arg, ulonglong seed2_arg)
- :Log_event(thd_arg,0,0),seed1(seed1_arg),seed2(seed2_arg)
- {}
+ Rand_log_event(THD* thd_arg, ulonglong seed1_arg, ulonglong seed2_arg,
+ uint16 cache_type_arg)
+ :Log_event(thd_arg, 0, 0), seed1(seed1_arg), seed2(seed2_arg)
+ { cache_type= cache_type_arg; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
@@ -2448,7 +2515,8 @@ class Xid_log_event: public Log_event
my_xid xid;
#ifndef MYSQL_CLIENT
- Xid_log_event(THD* thd_arg, my_xid x): Log_event(thd_arg,0,0), xid(x) {}
+ Xid_log_event(THD* thd_arg, my_xid x): Log_event(thd_arg, 0, 0), xid(x)
+ { cache_type= EVENT_NO_CACHE; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
@@ -2495,10 +2563,11 @@ public:
#ifndef MYSQL_CLIENT
User_var_log_event(THD* thd_arg, char *name_arg, uint name_len_arg,
char *val_arg, ulong val_len_arg, Item_result type_arg,
- uint charset_number_arg)
- :Log_event(), name(name_arg), name_len(name_len_arg), val(val_arg),
- val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg)
- { is_null= !val; }
+ uint charset_number_arg,
+ uint16 cache_type_arg)
+ :Log_event(thd_arg, 0, 0), name(name_arg), name_len(name_len_arg), val(val_arg),
+ val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg)
+ { is_null= !val; cache_type= cache_type_arg; }
void pack_info(Protocol* protocol);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -3010,7 +3079,7 @@ class Annotate_rows_log_event: public Log_event
{
public:
#ifndef MYSQL_CLIENT
- Annotate_rows_log_event(THD*);
+ Annotate_rows_log_event(THD*, uint16 cache_type_arg);
#endif
Annotate_rows_log_event(const char *buf, uint event_len,
const Format_description_log_event*);
@@ -4040,6 +4109,10 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
const char **group_relay_log_name,
ulonglong *relay_log_pos);
+bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
+uint8 get_checksum_alg(const char* buf, ulong len);
+extern TYPELIB binlog_checksum_typelib;
+
/**
@} (end of group Replication)
*/
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index c18ade20d02..a549ec77c4a 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -1998,6 +1998,8 @@ extern ulong binlog_cache_size, open_files_limit;
extern ulonglong max_binlog_cache_size;
extern ulong max_binlog_size, max_relay_log_size;
extern ulong opt_binlog_rows_event_max_size;
+extern my_bool opt_master_verify_checksum;
+extern my_bool opt_slave_sql_verify_checksum;
extern ulong rpl_recovery_rank, thread_cache_size, thread_pool_size;
extern ulong back_log;
#endif /* MYSQL_SERVER */
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 9fd6d8c7022..272a11a4c81 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -560,6 +560,8 @@ my_bool opt_noacl;
my_bool sp_automatic_privileges= 1;
ulong opt_binlog_rows_event_max_size;
+my_bool opt_master_verify_checksum= 0;
+my_bool opt_slave_sql_verify_checksum= 1;
const char *binlog_format_names[]= {"MIXED", "STATEMENT", "ROW", NullS};
TYPELIB binlog_format_typelib=
{ array_elements(binlog_format_names) - 1, "",
@@ -4612,6 +4614,7 @@ int main(int argc, char **argv)
#ifndef DBUG_OFF
test_lc_time_sz();
+ srand(time(NULL));
#endif
/*
@@ -6026,7 +6029,9 @@ enum options_mysqld
OPT_SLOW_QUERY_LOG_FILE,
OPT_IGNORE_BUILTIN_INNODB,
OPT_BINLOG_DIRECT_NON_TRANS_UPDATE,
- OPT_DEFAULT_CHARACTER_SET_OLD
+ OPT_DEFAULT_CHARACTER_SET_OLD,
+ OPT_MASTER_VERIFY_CHECKSUM,
+ OPT_SLAVE_SQL_VERIFY_CHECKSUM
};
@@ -6859,6 +6864,19 @@ thread is in the relay logs.",
"not stop for operations that are idempotent. In STRICT mode, replication "
"will stop on any unexpected difference between the master and the slave.",
&slave_exec_mode_str, &slave_exec_mode_str, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"master-verify-checksum", OPT_MASTER_VERIFY_CHECKSUM,
+ "Force checksum verification of logged events in binary log before "
+ "sending them to slaves or printing them in output of SHOW BINLOG EVENTS. "
+ "Disabled by default.",
+ &opt_master_verify_checksum, &opt_master_verify_checksum,
+ 0, GET_BOOL, OPT_ARG, 0, 0, 0, 0, 0, 0},
+ {"slave-sql-verify-checksum", OPT_SLAVE_SQL_VERIFY_CHECKSUM,
+ "Force checksum verification of replication events after reading them "
+ "from relay log. Note: Events are always checksum-verified by slave on "
+ "receiving them from the network before writing them to the relay "
+ "log. Enabled by default.",
+ &opt_slave_sql_verify_checksum, &opt_slave_sql_verify_checksum,
+ 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
#endif
{"slow-query-log", OPT_SLOW_LOG,
"Enable/disable slow query log.", &opt_slow_log,
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 9b1b1f70784..99536b68cf6 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -244,7 +244,8 @@ static int find_target_pos(LEX_MASTER_INFO *mi, IO_CACHE *log, char *errmsg)
for (;;)
{
Log_event* ev;
- if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0)))
+ if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*) 0, 0,
+ opt_slave_sql_verify_checksum)))
{
if (log->error > 0)
strmov(errmsg, "Binary log truncated in the middle of event");
@@ -418,7 +419,8 @@ static Slave_log_event* find_slave_event(IO_CACHE* log,
for (i = 0; i < 2; i++)
{
- if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
+ if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0,
+ opt_slave_sql_verify_checksum)))
{
my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
"Error reading event in log '%s'",
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 47fc88c9a8a..eb5ee58e538 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -24,7 +24,8 @@
Master_info::Master_info()
:Slave_reporting_capability("I/O"),
- ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), inited(0),
+ ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0),
+ checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF), inited(0),
abort_slave(0),slave_running(0),
slave_run_id(0)
{
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index d63432545e5..5cac5284d9d 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -83,6 +83,12 @@ class Master_info : public Slave_reporting_capability
Relay_log_info rli;
uint port;
uint connect_retry;
+ /*
+ to hold checksum alg in use until IO thread has received FD.
+ Initialized to novalue, then set to the queried from master
+ @@global.binlog_checksum and deactivated once FD has been received.
+ */
+ uint8 checksum_alg_before_fd;
#ifndef DBUG_OFF
int events_till_disconnect;
#endif
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index a2d0b1e4904..0886fd02004 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -175,6 +175,9 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
"use '--relay-log=%s' to avoid this problem.", ln);
name_warning_sent= 1;
}
+
+ rli->relay_log.is_relay_log= TRUE;
+
/*
note, that if open() fails, we'll still have index file open
but a destructor will take care of that
@@ -188,7 +191,6 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
sql_print_error("Failed in open_log() called from init_relay_log_info()");
DBUG_RETURN(1);
}
- rli->relay_log.is_relay_log= TRUE;
}
/* if file does not exist */
@@ -534,8 +536,9 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
Because of we have rli->data_lock and log_lock, we can safely read an
event
*/
- if (!(ev=Log_event::read_log_event(rli->cur_log,0,
- rli->relay_log.description_event_for_exec)))
+ if (!(ev= Log_event::read_log_event(rli->cur_log, 0,
+ rli->relay_log.description_event_for_exec,
+ opt_slave_sql_verify_checksum)))
{
DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
rli->cur_log->error));
diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc
index 6058c473e9f..d4616723117 100644
--- a/sql/rpl_utility.cc
+++ b/sql/rpl_utility.cc
@@ -14,6 +14,8 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "rpl_utility.h"
+
+#ifndef MYSQL_CLIENT
#include "rpl_rli.h"
/*********************************************************************
@@ -224,3 +226,62 @@ table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table)
return error;
}
+
+#endif /* MYSQL_CLIENT */
+
+
+/**
+ @param even_buf point to the buffer containing serialized event
+ @param event_len length of the event accounting possible checksum alg
+
+ @return TRUE if test fails
+ FALSE as success
+*/
+bool event_checksum_test(uchar *event_buf, ulong event_len, uint8 alg)
+{
+ bool res= FALSE;
+ uint16 flags= 0; // to store in FD's buffer flags orig value
+
+ if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ ha_checksum incoming;
+ ha_checksum computed;
+
+ if (event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
+ {
+#ifndef DBUG_OFF
+ int8 fd_alg= event_buf[event_len - BINLOG_CHECKSUM_LEN -
+ BINLOG_CHECKSUM_ALG_DESC_LEN];
+#endif
+ /*
+ FD event is checksummed and therefore verified w/o the binlog-in-use flag
+ */
+ flags= uint2korr(event_buf + FLAGS_OFFSET);
+ if (flags & LOG_EVENT_BINLOG_IN_USE_F)
+ event_buf[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ /*
+ The only algorithm currently is CRC32. Zero indicates
+ the binlog file is checksum-free *except* the FD-event.
+ */
+ DBUG_ASSERT(fd_alg == BINLOG_CHECKSUM_ALG_CRC32 || fd_alg == 0);
+ DBUG_ASSERT(alg == BINLOG_CHECKSUM_ALG_CRC32);
+ /*
+ Complile time guard to watch over the max number of alg
+ */
+ compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80);
+ }
+ incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN);
+ computed= my_checksum(0L, NULL, 0);
+ /* checksum the event content but the checksum part itself */
+ computed= my_checksum(computed, (const uchar*) event_buf,
+ event_len - BINLOG_CHECKSUM_LEN);
+ if (flags != 0)
+ {
+ /* restoring the orig value of flags of FD */
+ DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
+ event_buf[FLAGS_OFFSET]= flags;
+ }
+ res= !(computed == incoming);
+ }
+ return DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, res);
+}
diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt
index a872fa1f1e9..aee6927e127 100644
--- a/sql/share/errmsg.txt
+++ b/sql/share/errmsg.txt
@@ -6249,3 +6249,7 @@ ER_UNKNOWN_OPTION
eng "Unknown option '%-.64s'"
ER_BAD_OPTION_VALUE
eng "Incorrect value '%-.64s' for option '%-.64s'"
+ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE
+ eng "Replication event checksum verification failed while reading from network."
+ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE
+ eng "Replication event checksum verification failed while reading from a log file."
diff --git a/sql/slave.cc b/sql/slave.cc
index 63b7ce715c9..b042d463b1e 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -937,6 +937,48 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
}
/*
+ FD_q's (A) is set initially from RL's (A): FD_q.(A) := RL.(A).
+ It's necessary to adjust FD_q.(A) at this point because in the following
+ course FD_q is going to be dumped to RL.
+ Generally FD_q is derived from a received FD_m (roughly FD_q := FD_m)
+ in queue_event and the master's (A) is installed.
+ At one step with the assignment the Relay-Log's checksum alg is set to
+ a new value: RL.(A) := FD_q.(A). If the slave service is stopped
+ the last time assigned RL.(A) will be passed over to the restarting
+ service (to the current execution point).
+ RL.A is a "codec" to verify checksum in queue_event() almost all the time
+ the first fake Rotate event.
+ Starting from this point IO thread will executes the following checksum
+ warmup sequence of actions:
+
+ FD_q.A := RL.A,
+ A_m^0 := master.@@global.binlog_checksum,
+ {queue_event(R_f): verifies(R_f, A_m^0)},
+ {queue_event(FD_m): verifies(FD_m, FD_m.A), dump(FD_q), rotate(RL),
+ FD_q := FD_m, RL.A := FD_q.A)}
+
+ See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg
+ docs lines (binlog.h).
+ In above A_m^0 - the value of master's
+ @@binlog_checksum determined in the upcoming handshake (stored in
+ mi->checksum_alg_before_fd).
+
+
+ After the warm-up sequence IO gets to "normal" checksum verification mode
+ to use RL.A in
+
+ {queue_event(E_m): verifies(E_m, RL.A)}
+
+ until it has received a new FD_m.
+ */
+ mi->rli.relay_log.description_event_for_queue->checksum_alg=
+ mi->rli.relay_log.relay_log_checksum_alg;
+
+ DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg !=
+ BINLOG_CHECKSUM_ALG_UNDEF);
+ DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg !=
+ BINLOG_CHECKSUM_ALG_UNDEF);
+ /*
Compare the master and slave's clock. Do not die if master's clock is
unavailable (very old master not supporting UNIX_TIMESTAMP()?).
*/
@@ -1175,6 +1217,103 @@ when it try to get the value of TIME_ZONE global variable from master.";
}
}
+ /*
+ Querying if master is capable to checksum and notifying it about own
+ CRC-awareness. The master's side instant value of @@global.binlog_checksum
+ is stored in the dump thread's uservar area as well as cached locally
+ to become known in consensus by master and slave.
+ */
+ DBUG_EXECUTE_IF("simulate_slave_unaware_checksum",
+ mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_OFF;
+ goto past_checksum;);
+ {
+ int rc;
+ const char query[]= "SET @master_binlog_checksum= @@global.binlog_checksum";
+ master_res= NULL;
+ mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; //initially undefined
+ /*
+ @c checksum_alg_before_fd is queried from master in this block.
+ If master is old checksum-unaware the value stays undefined.
+ Once the first FD will be received its alg descriptor will replace
+ the being queried one.
+ */
+ rc= mysql_real_query(mysql, query, strlen(query));
+ if (rc != 0)
+ {
+ if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
+
+ if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE)
+ {
+ // this is tolerable as OM -> NS is supported
+ mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ "Notifying master by %s failed with "
+ "error: %s", query, mysql_error(mysql));
+ }
+ else
+ {
+ if (is_network_error(mysql_errno(mysql)))
+ {
+ mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ "Notifying master by %s failed with "
+ "error: %s", query, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto network_err;
+ }
+ else
+ {
+ errmsg= "The slave I/O thread stops because a fatal error is encountered "
+ "when it tried to SET @master_binlog_checksum on master.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
+ }
+ }
+ else
+ {
+ mysql_free_result(mysql_store_result(mysql));
+ if (!mysql_real_query(mysql,
+ STRING_WITH_LEN("SELECT @master_binlog_checksum")) &&
+ (master_res= mysql_store_result(mysql)) &&
+ (master_row= mysql_fetch_row(master_res)) &&
+ (master_row[0] != NULL))
+ {
+ mi->checksum_alg_before_fd= (uint8)
+ find_type(master_row[0], &binlog_checksum_typelib, 1) - 1;
+ // valid outcome is either of
+ DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF ||
+ mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32);
+ }
+ else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
+ else if (is_network_error(mysql_errno(mysql)))
+ {
+ mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ "Get master BINLOG_CHECKSUM failed with error: %s", mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ errmsg= "The slave I/O thread stops because a fatal error is encountered "
+ "when it tried to SELECT @master_binlog_checksum.";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
+ }
+ if (master_res)
+ {
+ mysql_free_result(master_res);
+ master_res= NULL;
+ }
+ }
+
+#ifndef DBUG_OFF
+past_checksum:
+#endif
err:
if (errmsg)
{
@@ -1191,6 +1330,11 @@ network_err:
if (master_res)
mysql_free_result(master_res);
DBUG_RETURN(2);
+
+slave_killed_err:
+ if (master_res)
+ mysql_free_result(master_res);
+ DBUG_RETURN(2);
}
/*
@@ -3384,10 +3528,15 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
*/
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
{
+ DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg ==
+ mi->rli.relay_log.relay_log_checksum_alg);
+
delete mi->rli.relay_log.description_event_for_queue;
/* start from format 3 (MySQL 4.0) again */
mi->rli.relay_log.description_event_for_queue= new
Format_description_log_event(3);
+ mi->rli.relay_log.description_event_for_queue->checksum_alg=
+ mi->rli.relay_log.relay_log_checksum_alg;
}
/*
Rotate the relay log makes binlog format detection easier (at next slave
@@ -3440,8 +3589,9 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
Append_block/Exec_load (the SQL thread needs the data, as that thread is not
connected to the master).
*/
- Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
- mi->rli.relay_log.description_event_for_queue);
+ Log_event *ev=
+ Log_event::read_log_event(buf, event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue, 0);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
@@ -3528,8 +3678,9 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
DBUG_ENTER("queue_binlog_ver_3_event");
/* read_log_event() will adjust log_pos to be end_log_pos */
- Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
- mi->rli.relay_log.description_event_for_queue);
+ Log_event *ev=
+ Log_event::read_log_event(buf,event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue, 0);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
@@ -3555,6 +3706,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
inc_pos= event_len;
break;
}
+
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
@@ -3616,7 +3768,68 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
ulong inc_pos;
Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ bool unlock_data_lock= TRUE;
+ /*
+ FD_q must have been prepared for the first R_a event
+ inside get_master_version_and_clock()
+ Show-up of FD:s affects checksum_alg at once because
+ that changes FD_queue.
+ */
+ uint8 checksum_alg= mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ?
+ mi->checksum_alg_before_fd :
+ mi->rli.relay_log.relay_log_checksum_alg;
+
+ char *save_buf= NULL; // needed for checksumming the fake Rotate event
+ char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN];
+
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+
DBUG_ENTER("queue_event");
+ /*
+ FD_queue checksum alg description does not apply in a case of
+ FD itself. The one carries both parts of the checksum data.
+ */
+ if (buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
+ {
+ checksum_alg= get_checksum_alg(buf, event_len);
+ }
+ else if (buf[EVENT_TYPE_OFFSET] == START_EVENT_V3)
+ {
+ // checksum behaviour is similar to the pre-checksum FD handling
+ mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF;
+ mi->rli.relay_log.description_event_for_queue->checksum_alg=
+ mi->rli.relay_log.relay_log_checksum_alg= checksum_alg=
+ BINLOG_CHECKSUM_ALG_OFF;
+ }
+
+ // does not hold always because of old binlog can work with NM
+ // DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
+
+ // should hold unless manipulations with RL. Tests that do that
+ // will have to refine the clause.
+ DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg !=
+ BINLOG_CHECKSUM_ALG_UNDEF);
+
+ // Emulate the network corruption
+ DBUG_EXECUTE_IF("corrupt_queue_event",
+ if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ {
+ char *debug_event_buf_c = (char*) buf;
+ int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at queue_event: byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_queue_event");
+ }
+ );
+
+ if (event_checksum_test((uchar *) buf, event_len, checksum_alg))
+ {
+ error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE;
+ unlock_data_lock= FALSE;
+ goto err;
+ }
LINT_INIT(inc_pos);
@@ -3644,12 +3857,67 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
case ROTATE_EVENT:
{
- Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
- if (unlikely(process_io_rotate(mi,&rev)))
+ Rotate_log_event rev(buf, checksum_alg != BINLOG_CHECKSUM_ALG_OFF ?
+ event_len - BINLOG_CHECKSUM_LEN : event_len,
+ mi->rli.relay_log.description_event_for_queue);
+
+ if (unlikely(process_io_rotate(mi, &rev)))
{
error= 1;
goto err;
}
+ /*
+ Checksum special cases for the fake Rotate (R_f) event caused by the protocol
+ of events generation and serialization in RL where Rotate of master is
+ queued right next to FD of slave.
+ Since it's only FD that carries the alg desc of FD_s has to apply to R_m.
+ Two special rules apply only to the first R_f which comes in before any FD_m.
+ The 2nd R_f should be compatible with the FD_s that must have taken over
+ the last seen FD_m's (A).
+
+ RSC_1: If OM \and fake Rotate \and slave is configured to
+ to compute checksum for its first FD event for RL
+ the fake Rotate gets checksummed here.
+ */
+ if (uint4korr(&buf[0]) == 0 && checksum_alg == BINLOG_CHECKSUM_ALG_OFF &&
+ mi->rli.relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
+ {
+ ha_checksum rot_crc= my_checksum(0L, NULL, 0);
+ event_len += BINLOG_CHECKSUM_LEN;
+ memcpy(rot_buf, buf, event_len - BINLOG_CHECKSUM_LEN);
+ int4store(&rot_buf[EVENT_LEN_OFFSET],
+ uint4korr(&rot_buf[EVENT_LEN_OFFSET]) + BINLOG_CHECKSUM_LEN);
+ rot_crc= my_checksum(rot_crc, (const uchar *) rot_buf,
+ event_len - BINLOG_CHECKSUM_LEN);
+ int4store(&rot_buf[event_len - BINLOG_CHECKSUM_LEN], rot_crc);
+ DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET]));
+ DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg ==
+ mi->rli.relay_log.relay_log_checksum_alg);
+ /* the first one */
+ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
+ save_buf= (char *) buf;
+ buf= rot_buf;
+ }
+ else
+ /*
+ RSC_2: If NM \and fake Rotate \and slave does not compute checksum
+ the fake Rotate's checksum is stripped off before relay-logging.
+ */
+ if (uint4korr(&buf[0]) == 0 && checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ mi->rli.relay_log.relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_OFF)
+ {
+ event_len -= BINLOG_CHECKSUM_LEN;
+ memcpy(rot_buf, buf, event_len);
+ int4store(&rot_buf[EVENT_LEN_OFFSET],
+ uint4korr(&rot_buf[EVENT_LEN_OFFSET]) - BINLOG_CHECKSUM_LEN);
+ DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET]));
+ DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg ==
+ mi->rli.relay_log.relay_log_checksum_alg);
+ /* the first one */
+ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
+ save_buf= (char *) buf;
+ buf= rot_buf;
+ }
/*
Now the I/O thread has just changed its mi->master_log_name, so
incrementing mi->master_log_pos is nonsense.
@@ -3670,15 +3938,24 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
Format_description_log_event* tmp;
const char* errmsg;
+ // mark it as undefined that is irrelevant anymore
+ mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF;
if (!(tmp= (Format_description_log_event*)
Log_event::read_log_event(buf, event_len, &errmsg,
- mi->rli.relay_log.description_event_for_queue)))
+ mi->rli.relay_log.description_event_for_queue,
+ 1)))
{
error= 2;
goto err;
}
delete mi->rli.relay_log.description_event_for_queue;
mi->rli.relay_log.description_event_for_queue= tmp;
+ if (tmp->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
+ tmp->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+
+ /* installing new value of checksum Alg for relay log */
+ mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg;
+
/*
Though this does some conversion to the slave's format, this will
preserve the master's binlog format version, and number of event types.
@@ -3755,12 +4032,15 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
else
error= 3;
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
+ if (save_buf != NULL)
+ buf= save_buf;
}
pthread_mutex_unlock(log_lock);
err:
- pthread_mutex_unlock(&mi->data_lock);
+ if (unlock_data_lock)
+ pthread_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
DBUG_RETURN(error);
}
@@ -4150,8 +4430,9 @@ static Log_event* next_event(Relay_log_info* rli)
But if the relay log is created by new_file(): then the solution is:
MYSQL_BIN_LOG::open() will write the buffered description event.
*/
- if ((ev=Log_event::read_log_event(cur_log,0,
- rli->relay_log.description_event_for_exec)))
+ if ((ev= Log_event::read_log_event(cur_log,0,
+ rli->relay_log.description_event_for_exec,
+ opt_slave_sql_verify_checksum)))
{
DBUG_ASSERT(thd==rli->sql_thd);
@@ -4552,9 +4833,9 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report,
{37426, { 5, 1, 0 }, { 5, 1, 26 } },
};
const uchar *master_ver=
- rli->relay_log.description_event_for_exec->server_version_split;
+ rli->relay_log.description_event_for_exec->server_version_split.ver;
- DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split) == 3);
+ DBUG_ASSERT(sizeof(rli->relay_log.description_event_for_exec->server_version_split.ver) == 3);
for (uint i= 0;
i < sizeof(versions_for_all_bugs)/sizeof(*versions_for_all_bugs);i++)
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index a59cd631fef..93ade4ffe4e 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -186,7 +186,8 @@ void mysql_client_binlog_statement(THD* thd)
}
ev= Log_event::read_log_event(bufptr, event_len, &error,
- rli->relay_log.description_event_for_exec);
+ rli->relay_log.description_event_for_exec,
+ 0);
DBUG_PRINT("info",("binlog base64 err=%s", error));
if (!ev)
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 2b5c3048534..e38784c472a 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -118,7 +118,7 @@ typedef struct st_user_var_events
#define RP_LOCK_LOG_IS_ALREADY_LOCKED 1
#define RP_FORCE_ROTATE 2
-
+#define RP_BINLOG_CHECKSUM_ALG_CHANGE 4
/*
The COPY_INFO structure is used by INSERT/REPLACE code.
The schema of the row counting by the INSERT/INSERT ... ON DUPLICATE KEY
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 596f0f5c1e6..8b6ba0e44e5 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -28,6 +28,9 @@ my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
+extern TYPELIB binlog_checksum_typelib;
+
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -47,10 +50,21 @@ static int binlog_dump_count = 0;
*/
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
- ulonglong position, const char** errmsg)
+ ulonglong position, const char** errmsg,
+ uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
+
+ /*
+ this Rotate is to be sent with checksum if and only if
+ slave's get_master_version_and_clock time handshake value
+ of master's @@global.binlog_checksum was TRUE
+ */
+
+ my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
+
/*
'when' (the timestamp) is set to 0 so that slave could distinguish between
real and fake Rotate events (if necessary)
@@ -60,7 +74,8 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
char* p = log_file_name+dirname_length(log_file_name);
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
+ (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
@@ -71,7 +86,19 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
packet->append(header, sizeof(header));
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
- packet->append(p,ident_len);
+ packet->append(p, ident_len);
+
+ if (do_checksum)
+ {
+ char b[BINLOG_CHECKSUM_LEN];
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ crc= my_checksum(crc, (uchar*)header, sizeof(header));
+ crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
+ crc= my_checksum(crc, (uchar*)p, ident_len);
+ int4store(b, crc);
+ packet->append(b, sizeof(b));
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
@@ -153,6 +180,86 @@ static int send_file(THD *thd)
}
+/**
+ Internal to mysql_binlog_send() routine that recalculates checksum for
+ a FD event (asserted) that needs additional arranment prior sending to slave.
+*/
+inline void fix_checksum(String *packet, ulong ev_offset)
+{
+ /* recalculate the crc for this event */
+ uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ DBUG_ASSERT(data_len ==
+ LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
+ BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN);
+ crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len -
+ BINLOG_CHECKSUM_LEN);
+ int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
+}
+
+
+static user_var_entry * get_binlog_checksum_uservar(THD * thd)
+{
+ LEX_STRING name= { C_STRING_WITH_LEN("master_binlog_checksum")};
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry;
+}
+
+/**
+ Function for calling in mysql_binlog_send
+ to check if slave initiated checksum-handshake.
+
+ @param[in] thd THD to access a user variable
+
+ @return TRUE if handshake took place, FALSE otherwise
+*/
+
+static bool is_slave_checksum_aware(THD * thd)
+{
+ DBUG_ENTER("is_slave_checksum_aware");
+ user_var_entry *entry= get_binlog_checksum_uservar(thd);
+ DBUG_RETURN(entry? true : false);
+}
+
+/**
+ Function for calling in mysql_binlog_send
+ to get the value of @@binlog_checksum of the master at
+ time of checksum-handshake.
+
+ The value tells the master whether to compute or not, and the slave
+ to verify or not the first artificial Rotate event's checksum.
+
+ @param[in] thd THD to access a user variable
+
+ @return value of @@binlog_checksum alg according to
+ @c enum enum_binlog_checksum_alg
+*/
+
+static uint8 get_binlog_checksum_value_at_connect(THD * thd)
+{
+ uint8 ret;
+
+ DBUG_ENTER("get_binlog_checksum_value_at_connect");
+ user_var_entry *entry= get_binlog_checksum_uservar(thd);
+ if (!entry)
+ {
+ ret= BINLOG_CHECKSUM_ALG_UNDEF;
+ }
+ else
+ {
+ DBUG_ASSERT(entry->type == STRING_RESULT);
+ String str;
+ uint dummy_errors;
+ str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin,
+ &dummy_errors);
+ ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1;
+ DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
+ }
+ DBUG_RETURN(ret);
+}
+
/*
Adjust the position pointer in the binary log file for all running slaves
@@ -327,6 +434,9 @@ Increase max_allowed_packet on master";
case LOG_READ_TRUNC:
*errmsg = "binlog truncated in the middle of event";
break;
+ case LOG_READ_CHECKSUM_FAILURE:
+ *errmsg = "event read from binlog did not pass crc check";
+ break;
default:
*errmsg = "unknown error reading log event on the master";
break;
@@ -353,6 +463,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
NET* net = &thd->net;
pthread_mutex_t *log_lock;
bool binlog_can_be_corrupted= FALSE;
+ uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
#endif
@@ -450,7 +562,8 @@ impossible position";
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
- if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
+ if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
+ get_binlog_checksum_value_at_connect(current_thd)))
{
/*
This error code is not perfect, as fake_rotate_event() does not
@@ -480,8 +593,8 @@ impossible position";
Try to find a Format_description_log_event at the beginning of
the binlog
*/
- if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
- {
+ if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
+ {
/*
The packet has offsets equal to the normal offsets in a binlog
event +1 (the first character is \0).
@@ -491,6 +604,23 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+1]));
if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
+ current_checksum_alg= get_checksum_alg(packet->ptr() + 1,
+ packet->length() - 1);
+ DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ if (!is_slave_checksum_aware(thd) &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Slave can not handle replication events with the checksum "
+ "that master is configured to log";
+ sql_print_warning("Master is configured to log replication events "
+ "with checksum, but will not send such events to "
+ "slaves that cannot process them");
+ goto err;
+ }
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
@@ -506,6 +636,12 @@ impossible position";
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+1, (ulong) 0);
+
+ /* fix the checksum due to latest changes in header */
+ if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ fix_checksum(packet, 1);
+
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -544,7 +680,8 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
- while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
+ while (!(error = Log_event::read_log_event(&log, packet, log_lock,
+ current_checksum_alg)))
{
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
@@ -558,6 +695,23 @@ impossible position";
if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
+ current_checksum_alg= get_checksum_alg(packet->ptr() + 1,
+ packet->length() - 1);
+ DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ if (!is_slave_checksum_aware(thd) &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Slave can not handle replication events with the checksum "
+ "that master is configured to log";
+ sql_print_warning("Master is configured to log replication events "
+ "with checksum, but will not send such events to "
+ "slaves that cannot process them");
+ goto err;
+ }
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
@@ -594,7 +748,8 @@ impossible position";
here we were reading binlog that was not closed properly (as a result
of a crash ?). treat any corruption as EOF
*/
- if (binlog_can_be_corrupted && error != LOG_READ_MEM)
+ if (binlog_can_be_corrupted &&
+ (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE))
error=LOG_READ_EOF;
/*
TODO: now that we are logging the offset, check to make sure
@@ -649,7 +804,8 @@ impossible position";
*/
pthread_mutex_lock(log_lock);
- switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) {
+ switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0,
+ current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
@@ -750,7 +906,7 @@ impossible position";
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
- &errmsg))
+ &errmsg, current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -1499,7 +1655,8 @@ bool mysql_show_binlog_events(THD* thd)
This code will fail on a mixed relay log (one which has Format_desc then
Rotate then Format_desc).
*/
- ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
+ ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event,
+ opt_master_verify_checksum);
if (ev)
{
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
@@ -1521,8 +1678,12 @@ bool mysql_show_binlog_events(THD* thd)
for (event_count = 0;
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
- description_event)); )
+ description_event,
+ opt_master_verify_checksum)); )
{
+ if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
+ description_event->checksum_alg= ev->checksum_alg;
+
if (event_count >= limit_start &&
ev->net_send(protocol, linfo.log_file_name, pos))
{
@@ -1815,6 +1976,12 @@ static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retrie
&slave_trans_retries);
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
+static sys_var_bool_ptr sys_master_verify_checksum(&vars,
+ "master_verify_checksum",
+ &opt_master_verify_checksum);
+static sys_var_bool_ptr sys_slave_sql_verify_checksum(&vars,
+ "slave_sql_verify_checksum",
+ &opt_slave_sql_verify_checksum);
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)