summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc191
1 files changed, 179 insertions, 12 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 596f0f5c1e6..8b6ba0e44e5 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -28,6 +28,9 @@ my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
+extern TYPELIB binlog_checksum_typelib;
+
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -47,10 +50,21 @@ static int binlog_dump_count = 0;
*/
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
- ulonglong position, const char** errmsg)
+ ulonglong position, const char** errmsg,
+ uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
+
+ /*
+ this Rotate is to be sent with checksum if and only if
+ slave's get_master_version_and_clock time handshake value
+ of master's @@global.binlog_checksum was TRUE
+ */
+
+ my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
+
/*
'when' (the timestamp) is set to 0 so that slave could distinguish between
real and fake Rotate events (if necessary)
@@ -60,7 +74,8 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
char* p = log_file_name+dirname_length(log_file_name);
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
+ (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
@@ -71,7 +86,19 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
packet->append(header, sizeof(header));
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
- packet->append(p,ident_len);
+ packet->append(p, ident_len);
+
+ if (do_checksum)
+ {
+ char b[BINLOG_CHECKSUM_LEN];
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ crc= my_checksum(crc, (uchar*)header, sizeof(header));
+ crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
+ crc= my_checksum(crc, (uchar*)p, ident_len);
+ int4store(b, crc);
+ packet->append(b, sizeof(b));
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
@@ -153,6 +180,86 @@ static int send_file(THD *thd)
}
+/**
+ Internal to mysql_binlog_send() routine that recalculates checksum for
+ a FD event (asserted) that needs additional arranment prior sending to slave.
+*/
+inline void fix_checksum(String *packet, ulong ev_offset)
+{
+ /* recalculate the crc for this event */
+ uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ DBUG_ASSERT(data_len ==
+ LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
+ BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN);
+ crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len -
+ BINLOG_CHECKSUM_LEN);
+ int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);
+}
+
+
+static user_var_entry * get_binlog_checksum_uservar(THD * thd)
+{
+ LEX_STRING name= { C_STRING_WITH_LEN("master_binlog_checksum")};
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry;
+}
+
+/**
+ Function for calling in mysql_binlog_send
+ to check if slave initiated checksum-handshake.
+
+ @param[in] thd THD to access a user variable
+
+ @return TRUE if handshake took place, FALSE otherwise
+*/
+
+static bool is_slave_checksum_aware(THD * thd)
+{
+ DBUG_ENTER("is_slave_checksum_aware");
+ user_var_entry *entry= get_binlog_checksum_uservar(thd);
+ DBUG_RETURN(entry? true : false);
+}
+
+/**
+ Function for calling in mysql_binlog_send
+ to get the value of @@binlog_checksum of the master at
+ time of checksum-handshake.
+
+ The value tells the master whether to compute or not, and the slave
+ to verify or not the first artificial Rotate event's checksum.
+
+ @param[in] thd THD to access a user variable
+
+ @return value of @@binlog_checksum alg according to
+ @c enum enum_binlog_checksum_alg
+*/
+
+static uint8 get_binlog_checksum_value_at_connect(THD * thd)
+{
+ uint8 ret;
+
+ DBUG_ENTER("get_binlog_checksum_value_at_connect");
+ user_var_entry *entry= get_binlog_checksum_uservar(thd);
+ if (!entry)
+ {
+ ret= BINLOG_CHECKSUM_ALG_UNDEF;
+ }
+ else
+ {
+ DBUG_ASSERT(entry->type == STRING_RESULT);
+ String str;
+ uint dummy_errors;
+ str.copy(entry->value, entry->length, &my_charset_bin, &my_charset_bin,
+ &dummy_errors);
+ ret= (uint8) find_type ((char*) str.ptr(), &binlog_checksum_typelib, 1) - 1;
+ DBUG_ASSERT(ret <= BINLOG_CHECKSUM_ALG_CRC32); // while it's just on CRC32 alg
+ }
+ DBUG_RETURN(ret);
+}
+
/*
Adjust the position pointer in the binary log file for all running slaves
@@ -327,6 +434,9 @@ Increase max_allowed_packet on master";
case LOG_READ_TRUNC:
*errmsg = "binlog truncated in the middle of event";
break;
+ case LOG_READ_CHECKSUM_FAILURE:
+ *errmsg = "event read from binlog did not pass crc check";
+ break;
default:
*errmsg = "unknown error reading log event on the master";
break;
@@ -353,6 +463,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
NET* net = &thd->net;
pthread_mutex_t *log_lock;
bool binlog_can_be_corrupted= FALSE;
+ uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
#endif
@@ -450,7 +562,8 @@ impossible position";
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
- if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
+ if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
+ get_binlog_checksum_value_at_connect(current_thd)))
{
/*
This error code is not perfect, as fake_rotate_event() does not
@@ -480,8 +593,8 @@ impossible position";
Try to find a Format_description_log_event at the beginning of
the binlog
*/
- if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
- {
+ if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
+ {
/*
The packet has offsets equal to the normal offsets in a binlog
event +1 (the first character is \0).
@@ -491,6 +604,23 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+1]));
if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
+ current_checksum_alg= get_checksum_alg(packet->ptr() + 1,
+ packet->length() - 1);
+ DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ if (!is_slave_checksum_aware(thd) &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Slave can not handle replication events with the checksum "
+ "that master is configured to log";
+ sql_print_warning("Master is configured to log replication events "
+ "with checksum, but will not send such events to "
+ "slaves that cannot process them");
+ goto err;
+ }
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
@@ -506,6 +636,12 @@ impossible position";
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+1, (ulong) 0);
+
+ /* fix the checksum due to latest changes in header */
+ if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ fix_checksum(packet, 1);
+
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -544,7 +680,8 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
- while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
+ while (!(error = Log_event::read_log_event(&log, packet, log_lock,
+ current_checksum_alg)))
{
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
@@ -558,6 +695,23 @@ impossible position";
if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
+ current_checksum_alg= get_checksum_alg(packet->ptr() + 1,
+ packet->length() - 1);
+ DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ if (!is_slave_checksum_aware(thd) &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Slave can not handle replication events with the checksum "
+ "that master is configured to log";
+ sql_print_warning("Master is configured to log replication events "
+ "with checksum, but will not send such events to "
+ "slaves that cannot process them");
+ goto err;
+ }
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
@@ -594,7 +748,8 @@ impossible position";
here we were reading binlog that was not closed properly (as a result
of a crash ?). treat any corruption as EOF
*/
- if (binlog_can_be_corrupted && error != LOG_READ_MEM)
+ if (binlog_can_be_corrupted &&
+ (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE))
error=LOG_READ_EOF;
/*
TODO: now that we are logging the offset, check to make sure
@@ -649,7 +804,8 @@ impossible position";
*/
pthread_mutex_lock(log_lock);
- switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) {
+ switch (error= Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0,
+ current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
@@ -750,7 +906,7 @@ impossible position";
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
- &errmsg))
+ &errmsg, current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -1499,7 +1655,8 @@ bool mysql_show_binlog_events(THD* thd)
This code will fail on a mixed relay log (one which has Format_desc then
Rotate then Format_desc).
*/
- ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
+ ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event,
+ opt_master_verify_checksum);
if (ev)
{
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
@@ -1521,8 +1678,12 @@ bool mysql_show_binlog_events(THD* thd)
for (event_count = 0;
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
- description_event)); )
+ description_event,
+ opt_master_verify_checksum)); )
{
+ if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
+ description_event->checksum_alg= ev->checksum_alg;
+
if (event_count >= limit_start &&
ev->net_send(protocol, linfo.log_file_name, pos))
{
@@ -1815,6 +1976,12 @@ static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retrie
&slave_trans_retries);
static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
+static sys_var_bool_ptr sys_master_verify_checksum(&vars,
+ "master_verify_checksum",
+ &opt_master_verify_checksum);
+static sys_var_bool_ptr sys_slave_sql_verify_checksum(&vars,
+ "slave_sql_verify_checksum",
+ &opt_slave_sql_verify_checksum);
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)