summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc1515
1 files changed, 740 insertions, 775 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 95438b69348..2bc9b6559e0 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -49,6 +49,7 @@
#include <base64.h>
#include <my_bitmap.h>
#include "rpl_utility.h"
+#include "rpl_constants.h"
#include "sql_digest.h"
#define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
@@ -518,6 +519,28 @@ pretty_print_str(String *packet, const char *str, int len)
}
#endif /* !MYSQL_CLIENT */
+#ifndef DBUG_OFF
+#define DBUG_DUMP_EVENT_BUF(B,L) \
+ do { \
+ const uchar *_buf=(uchar*)(B); \
+ size_t _len=(L); \
+ if (_len >= LOG_EVENT_MINIMAL_HEADER_LEN) \
+ { \
+ DBUG_PRINT("data", ("header: timestamp:%u type:%u server_id:%u len:%u log_pos:%u flags:%u", \
+ uint4korr(_buf), _buf[EVENT_TYPE_OFFSET], \
+ uint4korr(_buf+SERVER_ID_OFFSET), \
+ uint4korr(_buf+EVENT_LEN_OFFSET), \
+ uint4korr(_buf+LOG_POS_OFFSET), \
+ uint4korr(_buf+FLAGS_OFFSET))); \
+ DBUG_DUMP("data", _buf+LOG_EVENT_MINIMAL_HEADER_LEN, \
+ _len-LOG_EVENT_MINIMAL_HEADER_LEN); \
+ } \
+ else \
+ DBUG_DUMP("data", _buf, _len); \
+ } while(0)
+#else
+#define DBUG_DUMP_EVENT_BUF(B,L) do { } while(0)
+#endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
@@ -645,19 +668,6 @@ static void cleanup_load_tmpdir(LEX_STRING *connection_name)
/*
- write_str()
-*/
-
-static bool write_str(IO_CACHE *file, const char *str, uint length)
-{
- uchar tmp[1];
- tmp[0]= (uchar) length;
- return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
- my_b_safe_write(file, (uchar*) str, length));
-}
-
-
-/*
read_str()
*/
@@ -806,6 +816,7 @@ const char* Log_event::get_type_str(Log_event_type type)
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
case GTID_EVENT: return "Gtid";
case GTID_LIST_EVENT: return "Gtid_list";
+ case START_ENCRYPTION_EVENT: return "Start_encryption";
default: return "Unknown"; /* impossible */
}
}
@@ -822,8 +833,7 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0),
- crc(0), thd(thd_arg),
+ :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= thd->variables.server_id;
@@ -847,8 +857,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
*/
Log_event::Log_event()
- :temp_buf(0), exec_time(0), flags(0),
- cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
+ :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE),
thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= global_system_variables.server_id;
@@ -870,7 +879,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
:temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
- crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+ checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -1034,18 +1043,31 @@ int Log_event::net_send(THD *thd, Protocol *protocol, const char* log_name,
EVENTS.
*/
-void Log_event::init_show_field_list(List<Item>* field_list)
-{
- field_list->push_back(new Item_empty_string("Log_name", 20));
- field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
- MYSQL_TYPE_LONGLONG));
- field_list->push_back(new Item_empty_string("Event_type", 20));
- field_list->push_back(new Item_return_int("Server_id", 10,
- MYSQL_TYPE_LONG));
- field_list->push_back(new Item_return_int("End_log_pos",
- MY_INT32_NUM_DECIMAL_DIGITS,
- MYSQL_TYPE_LONGLONG));
- field_list->push_back(new Item_empty_string("Info", 20));
+void Log_event::init_show_field_list(THD *thd, List<Item>* field_list)
+{
+ MEM_ROOT *mem_root= thd->mem_root;
+ field_list->push_back(new (mem_root)
+ Item_empty_string(thd, "Log_name", 20),
+ mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "Pos",
+ MY_INT32_NUM_DECIMAL_DIGITS,
+ MYSQL_TYPE_LONGLONG),
+ mem_root);
+ field_list->push_back(new (mem_root)
+ Item_empty_string(thd, "Event_type", 20),
+ mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "Server_id", 10,
+ MYSQL_TYPE_LONG),
+ mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "End_log_pos",
+ MY_INT32_NUM_DECIMAL_DIGITS,
+ MYSQL_TYPE_LONGLONG),
+ mem_root);
+ field_list->push_back(new (mem_root) Item_empty_string(thd, "Info", 20),
+ mem_root);
}
/**
@@ -1076,12 +1098,14 @@ my_bool Log_event::need_checksum()
and Stop event)
provides their checksum alg preference through Log_event::checksum_alg.
*/
- ret= ((checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
- (checksum_alg != BINLOG_CHECKSUM_ALG_OFF) :
- ((binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF) &&
- (cache_type == Log_event::EVENT_NO_CACHE)) ?
- MY_TEST(binlog_checksum_options) : FALSE);
-
+ if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ ret= checksum_alg != BINLOG_CHECKSUM_ALG_OFF;
+ else
+ {
+ ret= binlog_checksum_options && cache_type == Log_event::EVENT_NO_CACHE;
+ checksum_alg= ret ? (enum_binlog_checksum_alg)binlog_checksum_options
+ : BINLOG_CHECKSUM_ALG_OFF;
+ }
/*
FD calls the methods before data_written has been calculated.
The following invariant claims if the current is not the first
@@ -1092,10 +1116,6 @@ my_bool Log_event::need_checksum()
DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
data_written == 0);
- if (checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
- checksum_alg= ret ? // calculated value stored
- (uint8) binlog_checksum_options : (uint8) BINLOG_CHECKSUM_ALG_OFF;
-
DBUG_ASSERT(!ret ||
((checksum_alg == binlog_checksum_options ||
/*
@@ -1110,8 +1130,9 @@ my_bool Log_event::need_checksum()
the local RL's Rotate and the master's Rotate
which IO thread instantiates via queue_binlog_ver_3_event.
*/
- get_type_code() == ROTATE_EVENT
- || /* FD is always checksummed */
+ get_type_code() == ROTATE_EVENT ||
+ get_type_code() == START_ENCRYPTION_EVENT ||
+ /* FD is always checksummed */
get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
checksum_alg != BINLOG_CHECKSUM_ALG_OFF));
@@ -1125,51 +1146,145 @@ my_bool Log_event::need_checksum()
DBUG_RETURN(ret);
}
-bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size)
+int Log_event_writer::write_internal(const uchar *pos, size_t len)
+{
+ if (my_b_safe_write(file, pos, len))
+ return 1;
+ bytes_written+= len;
+ return 0;
+}
+
+/*
+ as soon as encryption produces the first output block, write event_len
+ where it should be in a valid event header
+*/
+int Log_event_writer::maybe_write_event_len(uchar *pos, size_t len)
+{
+ if (len && event_len)
+ {
+ DBUG_ASSERT(len >= EVENT_LEN_OFFSET);
+ if (write_internal(pos + EVENT_LEN_OFFSET - 4, 4))
+ return 1;
+ int4store(pos + EVENT_LEN_OFFSET - 4, event_len);
+ event_len= 0;
+ }
+ return 0;
+}
+
+int Log_event_writer::encrypt_and_write(const uchar *pos, size_t len)
{
- if (need_checksum() && size != 0)
- crc= my_checksum(crc, buf, size);
+ uchar *dst= 0;
+ size_t dstsize= 0;
+
+ if (ctx)
+ {
+ dstsize= encryption_encrypted_length(len, ENCRYPTION_KEY_SYSTEM_DATA,
+ crypto->key_version);
+ if (!(dst= (uchar*)my_safe_alloca(dstsize)))
+ return 1;
- return my_b_safe_write(file, buf, size);
+ uint dstlen;
+ if (encryption_ctx_update(ctx, pos, len, dst, &dstlen))
+ goto err;
+ if (maybe_write_event_len(dst, dstlen))
+ return 1;
+ pos= dst;
+ len= dstlen;
+ }
+ if (write_internal(pos, len))
+ goto err;
+
+ my_safe_afree(dst, dstsize);
+ return 0;
+err:
+ my_safe_afree(dst, dstsize);
+ return 1;
}
-bool Log_event::write_footer(IO_CACHE* file)
+int Log_event_writer::write_header(uchar *pos, size_t len)
{
+ DBUG_ENTER("Log_event_writer::write_header");
/*
- footer contains the checksum-algorithm descriptor
- followed by the checksum value
+ recording checksum of FD event computed with dropped
+ possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
+ Similar step at verication: the active flag is dropped before
+ checksum computing.
*/
- if (need_checksum())
+ if (checksum_len)
{
- uchar buf[BINLOG_CHECKSUM_LEN];
- int4store(buf, crc);
- return (my_b_safe_write(file, (uchar*) buf, sizeof(buf)));
+ uchar save=pos[FLAGS_OFFSET];
+ pos[FLAGS_OFFSET]&= ~LOG_EVENT_BINLOG_IN_USE_F;
+ crc= my_checksum(0, pos, len);
+ pos[FLAGS_OFFSET]= save;
}
- return 0;
+
+ if (ctx)
+ {
+ uchar iv[BINLOG_IV_LENGTH];
+ crypto->set_iv(iv, my_b_safe_tell(file));
+ if (encryption_ctx_init(ctx, crypto->key, crypto->key_length,
+ iv, sizeof(iv), ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD,
+ ENCRYPTION_KEY_SYSTEM_DATA, crypto->key_version))
+ DBUG_RETURN(1);
+
+ DBUG_ASSERT(len >= LOG_EVENT_HEADER_LEN);
+ event_len= uint4korr(pos + EVENT_LEN_OFFSET);
+ DBUG_ASSERT(event_len >= len);
+ memcpy(pos + EVENT_LEN_OFFSET, pos, 4);
+ pos+= 4;
+ len-= 4;
+ }
+ DBUG_RETURN(encrypt_and_write(pos, len));
+}
+
+int Log_event_writer::write_data(const uchar *pos, size_t len)
+{
+ DBUG_ENTER("Log_event_writer::write_data");
+ if (checksum_len)
+ crc= my_checksum(crc, pos, len);
+
+ DBUG_RETURN(encrypt_and_write(pos, len));
+}
+
+int Log_event_writer::write_footer()
+{
+ DBUG_ENTER("Log_event_writer::write_footer");
+ if (checksum_len)
+ {
+ uchar checksum_buf[BINLOG_CHECKSUM_LEN];
+ int4store(checksum_buf, crc);
+ if (encrypt_and_write(checksum_buf, BINLOG_CHECKSUM_LEN))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
+ if (ctx)
+ {
+ uint dstlen;
+ uchar dst[MY_AES_BLOCK_SIZE*2];
+ if (encryption_ctx_finish(ctx, dst, &dstlen))
+ DBUG_RETURN(1);
+ if (maybe_write_event_len(dst, dstlen) || write_internal(dst, dstlen))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
+ DBUG_RETURN(0);
}
/*
- Log_event::write()
+ Log_event::write_header()
*/
-bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
+bool Log_event::write_header(ulong event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
ulong now;
- bool ret;
DBUG_ENTER("Log_event::write_header");
DBUG_PRINT("enter", ("filepos: %lld length: %lu type: %d",
- (longlong) my_b_tell(file), event_data_length,
+ (longlong) writer->pos(), event_data_length,
(int) get_type_code()));
- /* Store number of bytes that will be written by this event */
- data_written= event_data_length + sizeof(header);
+ writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0;
- if (need_checksum())
- {
- crc= my_checksum(0L, NULL, 0);
- data_written += BINLOG_CHECKSUM_LEN;
- }
+ /* Store number of bytes that will be written by this event */
+ data_written= event_data_length + sizeof(header) + writer->checksum_len;
/*
log_pos != 0 if this is relay-log event. In this case we should not
@@ -1187,32 +1302,11 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
else if (!log_pos)
{
/*
- Calculate position of end of event
-
- Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
- work well. So this will give slightly wrong positions for the
- Format_desc/Rotate/Stop events which the slave writes to its
- relay log. For example, the initial Format_desc will have
- end_log_pos=91 instead of 95. Because after writing the first 4
- bytes of the relay log, my_b_tell() still reports 0. Because
- my_b_append() does not update the counter which my_b_tell()
- later uses (one should probably use my_b_append_tell() to work
- around this). To get right positions even when writing to the
- relay log, we use the (new) my_b_safe_tell().
-
- Note that this raises a question on the correctness of all these
- DBUG_ASSERT(my_b_tell()=rli->event_relay_log_pos).
-
- If in a transaction, the log_pos which we calculate below is not
- very good (because then my_b_safe_tell() returns start position
- of the BEGIN, so it's like the statement was at the BEGIN's
- place), but it's not a very serious problem (as the slave, when
- it is in a transaction, does not take those end_log_pos into
- account (as it calls inc_event_relay_log_pos()). To be fixed
- later, so that it looks less strange. But not bug.
+ Calculate the position of where the next event will start
+ (end of this event, that is).
*/
- log_pos= my_b_safe_tell(file)+data_written;
+ log_pos= writer->pos() + data_written;
}
now= get_time(); // Query start time
@@ -1229,61 +1323,33 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written);
int4store(header+ LOG_POS_OFFSET, log_pos);
- /*
- recording checksum of FD event computed with dropped
- possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
- Similar step at verication: the active flag is dropped before
- checksum computing.
- */
- if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT ||
- !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F))
- {
- int2store(header+ FLAGS_OFFSET, flags);
- ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0;
- }
- else
- {
- ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0);
- if (!ret)
- {
- flags &= ~LOG_EVENT_BINLOG_IN_USE_F;
- int2store(header + FLAGS_OFFSET, flags);
- crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags));
- flags |= LOG_EVENT_BINLOG_IN_USE_F;
- int2store(header + FLAGS_OFFSET, flags);
- ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0);
- }
- if (!ret)
- ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags),
- sizeof(header)
- - (FLAGS_OFFSET + sizeof(flags))) != 0);
- }
- DBUG_RETURN( ret);
+ int2store(header + FLAGS_OFFSET, flags);
+
+ bool ret= writer->write_header(header, sizeof(header));
+ DBUG_RETURN(ret);
}
+#endif /* !MYSQL_CLIENT */
/**
- This needn't be format-tolerant, because we only read
- LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
+ This needn't be format-tolerant, because we only parse the first
+ LOG_EVENT_MINIMAL_HEADER_LEN bytes (just need the event's length).
*/
int Log_event::read_log_event(IO_CACHE* file, String* packet,
- mysql_mutex_t* log_lock,
- uint8 checksum_alg_arg,
- const char *log_file_name_arg,
- bool* is_binlog_active)
+ const Format_description_log_event *fdle,
+ enum enum_binlog_checksum_alg checksum_alg_arg)
{
ulong data_len;
- int result=0;
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
uchar ev_offset= packet->length();
- DBUG_ENTER("Log_event::read_log_event");
-
- if (log_lock)
- mysql_mutex_lock(log_lock);
-
- if (log_file_name_arg)
- *is_binlog_active= mysql_bin_log.is_active(log_file_name_arg);
+#ifndef max_allowed_packet
+ THD *thd=current_thd;
+ ulong max_allowed_packet= thd ? thd->slave_thread ? slave_max_allowed_packet
+ : thd->variables.max_allowed_packet
+ : ~(uint)0;
+#endif
+ DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,String*...)");
if (my_b_read(file, (uchar*) buf, sizeof(buf)))
{
@@ -1293,185 +1359,157 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
update to the log.
*/
DBUG_PRINT("error",("file->error: %d", file->error));
- if (!file->error)
- result= LOG_READ_EOF;
- else
- result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
- goto end;
+ DBUG_RETURN(file->error == 0 ? LOG_READ_EOF :
+ file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
}
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
- if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
- data_len > max(current_thd->variables.max_allowed_packet,
- opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
- {
- DBUG_PRINT("error",("data_len: %lu", data_len));
- result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
- LOG_READ_TOO_LARGE);
- goto end;
- }
/* Append the log event header to packet */
if (packet->append(buf, sizeof(buf)))
- {
- /* Failed to allocate packet */
- result= LOG_READ_MEM;
- goto end;
- }
- data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
- if (data_len)
+ DBUG_RETURN(LOG_READ_MEM);
+
+ if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN)
+ DBUG_RETURN(LOG_READ_BOGUS);
+
+ if (data_len > max(max_allowed_packet,
+ opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
+ DBUG_RETURN(LOG_READ_TOO_LARGE);
+
+ if (likely(data_len > LOG_EVENT_MINIMAL_HEADER_LEN))
{
/* Append rest of event, read directly from file into packet */
- if (packet->append(file, data_len))
+ if (packet->append(file, data_len - LOG_EVENT_MINIMAL_HEADER_LEN))
{
/*
Fatal error occured when appending rest of the event
to packet, possible failures:
1. EOF occured when reading from file, it's really an error
- as data_len is >=0 there's supposed to be more bytes available.
+ as there's supposed to be more bytes available.
file->error will have been set to number of bytes left to read
2. Read was interrupted, file->error would normally be set to -1
3. Failed to allocate memory for packet, my_errno
- will be ENOMEM(file->error shuold be 0, but since the
+ will be ENOMEM(file->error should be 0, but since the
memory allocation occurs before the call to read it might
be uninitialized)
*/
- result= (my_errno == ENOMEM ? LOG_READ_MEM :
- (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
- /* Implicit goto end; */
- }
- else
- {
- /* Corrupt the event for Dump thread*/
- DBUG_EXECUTE_IF("corrupt_read_log_event2",
- uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
- if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
- {
- int debug_cor_pos = rand() % (data_len + sizeof(buf) - BINLOG_CHECKSUM_LEN);
- debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
- DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
- DBUG_SET("-d,corrupt_read_log_event2");
- }
- );
- /*
- CRC verification of the Dump thread
- */
- if (opt_master_verify_checksum &&
- event_checksum_test((uchar*) packet->ptr() + ev_offset,
- data_len + sizeof(buf),
- checksum_alg_arg))
- {
- result= LOG_READ_CHECKSUM_FAILURE;
- goto end;
- }
+ DBUG_RETURN(my_errno == ENOMEM ? LOG_READ_MEM :
+ (file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
}
}
-end:
- if (log_lock)
- mysql_mutex_unlock(log_lock);
- DBUG_RETURN(result);
-}
-#endif /* !MYSQL_CLIENT */
+ if (fdle->crypto_data.scheme)
+ {
+ uchar iv[BINLOG_IV_LENGTH];
+ fdle->crypto_data.set_iv(iv, my_b_tell(file) - data_len);
-#ifndef MYSQL_CLIENT
-#define UNLOCK_MUTEX if (log_lock) mysql_mutex_unlock(log_lock);
-#define LOCK_MUTEX if (log_lock) mysql_mutex_lock(log_lock);
-#else
-#define UNLOCK_MUTEX
-#define LOCK_MUTEX
-#endif
+ char *newpkt= (char*)my_malloc(data_len + ev_offset + 1, MYF(MY_WME));
+ if (!newpkt)
+ DBUG_RETURN(LOG_READ_MEM);
+ memcpy(newpkt, packet->ptr(), ev_offset);
+
+ uint dstlen;
+ uchar *src= (uchar*)packet->ptr() + ev_offset;
+ uchar *dst= (uchar*)newpkt + ev_offset;
+ memcpy(src + EVENT_LEN_OFFSET, src, 4);
+ if (encryption_crypt(src + 4, data_len - 4, dst + 4, &dstlen,
+ fdle->crypto_data.key, fdle->crypto_data.key_length, iv,
+ sizeof(iv), ENCRYPTION_FLAG_DECRYPT | ENCRYPTION_FLAG_NOPAD,
+ ENCRYPTION_KEY_SYSTEM_DATA, fdle->crypto_data.key_version))
+ {
+ my_free(newpkt);
+ DBUG_RETURN(LOG_READ_DECRYPT);
+ }
+ DBUG_ASSERT(dstlen == data_len - 4);
+ memcpy(dst, dst + EVENT_LEN_OFFSET, 4);
+ int4store(dst + EVENT_LEN_OFFSET, data_len);
+ packet->reset(newpkt, data_len + ev_offset, data_len + ev_offset + 1,
+ &my_charset_bin);
+ }
-#ifndef MYSQL_CLIENT
-/**
- @note
- Allocates memory; The caller is responsible for clean-up.
-*/
-Log_event* Log_event::read_log_event(IO_CACHE* file,
- mysql_mutex_t* log_lock,
- const Format_description_log_event
- *description_event,
- my_bool crc_check)
-#else
-Log_event* Log_event::read_log_event(IO_CACHE* file,
- const Format_description_log_event
- *description_event,
- my_bool crc_check)
-#endif
-{
- DBUG_ENTER("Log_event::read_log_event");
- DBUG_ASSERT(description_event != 0);
- char head[LOG_EVENT_MINIMAL_HEADER_LEN];
/*
- First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
- check the event for sanity and to know its length; no need to really parse
- it. We say "at most" because this could be a 3.23 master, which has header
- of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
- "minimal" over the set {MySQL >=4.0}).
+ CRC verification of the Dump thread
*/
- uint header_size= MY_MIN(description_event->common_header_len,
- LOG_EVENT_MINIMAL_HEADER_LEN);
-
- LOCK_MUTEX;
- DBUG_PRINT("info", ("my_b_tell: %lu", (ulong) my_b_tell(file)));
- if (my_b_read(file, (uchar *) head, header_size))
+ if (data_len > LOG_EVENT_MINIMAL_HEADER_LEN)
{
- DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \
-failed my_b_read"));
- UNLOCK_MUTEX;
- /*
- No error here; it could be that we are at the file's end. However
- if the next my_b_read() fails (below), it will be an error as we
- were able to read the first bytes.
- */
- DBUG_RETURN(0);
+ /* Corrupt the event for Dump thread*/
+ DBUG_EXECUTE_IF("corrupt_read_log_event2",
+ uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
+ if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ {
+ int debug_cor_pos = rand() % (data_len - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event2");
+ }
+ );
+ if (event_checksum_test((uchar*) packet->ptr() + ev_offset,
+ data_len, checksum_alg_arg))
+ DBUG_RETURN(LOG_READ_CHECKSUM_FAILURE);
}
- ulong data_len = uint4korr(head + EVENT_LEN_OFFSET);
- char *buf= 0;
+ DBUG_RETURN(0);
+}
+
+Log_event* Log_event::read_log_event(IO_CACHE* file, mysql_mutex_t* log_lock,
+ const Format_description_log_event *fdle,
+ my_bool crc_check)
+{
+ DBUG_ENTER("Log_event::read_log_event(IO_CACHE*,Format_description_log_event*...)");
+ DBUG_ASSERT(fdle != 0);
+ String event;
const char *error= 0;
- Log_event *res= 0;
-#ifndef max_allowed_packet
- THD *thd=current_thd;
- uint max_allowed_packet= thd ? slave_max_allowed_packet:~(uint)0;
-#endif
+ Log_event *res= 0;
- if (data_len > max<ulong>(max_allowed_packet,
- opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
- {
- error = "Event too big";
- goto err;
- }
+ if (log_lock)
+ mysql_mutex_lock(log_lock);
- if (data_len < header_size)
+ switch (read_log_event(file, &event, fdle, BINLOG_CHECKSUM_ALG_OFF))
{
- error = "Event too small";
- goto err;
+ case 0:
+ break;
+ case LOG_READ_EOF: // no error here; we are at the file's end
+ goto err;
+ case LOG_READ_BOGUS:
+ error= "Event invalid";
+ goto err;
+ case LOG_READ_IO:
+ error= "read error";
+ goto err;
+ case LOG_READ_MEM:
+ error= "Out of memory";
+ goto err;
+ case LOG_READ_TRUNC:
+ error= "Event truncated";
+ goto err;
+ case LOG_READ_TOO_LARGE:
+ error= "Event too big";
+ goto err;
+ case LOG_READ_DECRYPT:
+ error= "Event decryption failure";
+ goto err;
+ case LOG_READ_CHECKSUM_FAILURE:
+ default:
+ DBUG_ASSERT(0);
+ error= "internal error";
+ goto err;
}
- // some events use the extra byte to null-terminate strings
- if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
- {
- error = "Out of memory";
- goto err;
- }
- buf[data_len] = 0;
- memcpy(buf, head, header_size);
- if (my_b_read(file, (uchar*) buf + header_size, data_len - header_size))
- {
- error = "read error";
- goto err;
- }
- if ((res= read_log_event(buf, data_len, &error, description_event, crc_check)))
- res->register_temp_buf(buf, TRUE);
+ if ((res= read_log_event(event.ptr(), event.length(),
+ &error, fdle, crc_check)))
+ res->register_temp_buf(event.release(), true);
err:
- UNLOCK_MUTEX;
- if (!res)
+ if (log_lock)
+ mysql_mutex_unlock(log_lock);
+ if (error)
{
- DBUG_ASSERT(error != 0);
- sql_print_error("Error in Log_event::read_log_event(): "
- "'%s', data_len: %lu, event_type: %d",
- error,data_len,(uchar)(head[EVENT_TYPE_OFFSET]));
- my_free(buf);
+ DBUG_ASSERT(!res);
+ if (event.length() >= OLD_HEADER_LEN)
+ sql_print_error("Error in Log_event::read_log_event(): '%s',"
+ " data_len: %lu, event_type: %d", error,
+ uint4korr(&event[EVENT_LEN_OFFSET]),
+ (uchar)event[EVENT_TYPE_OFFSET]);
+ else
+ sql_print_error("Error in Log_event::read_log_event(): '%s'", error);
/*
The SQL slave thread will check if file->error<0 to know
if there was an I/O error. Even if there is no "low-level" I/O errors
@@ -1487,26 +1525,25 @@ err:
/**
- Binlog format tolerance is in (buf, event_len, description_event)
+ Binlog format tolerance is in (buf, event_len, fdle)
constructors.
*/
Log_event* Log_event::read_log_event(const char* buf, uint event_len,
const char **error,
- const Format_description_log_event *description_event,
+ const Format_description_log_event *fdle,
my_bool crc_check)
{
Log_event* ev;
- uint8 alg;
+ enum enum_binlog_checksum_alg alg;
DBUG_ENTER("Log_event::read_log_event(char*,...)");
- DBUG_ASSERT(description_event != 0);
- DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
- DBUG_DUMP("data", (unsigned char*) buf, event_len);
+ DBUG_ASSERT(fdle != 0);
+ DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version));
+ DBUG_DUMP_EVENT_BUF(buf, event_len);
/* Check the integrity */
if (event_len < EVENT_LEN_OFFSET ||
- (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
- (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
+ (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT)
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
@@ -1515,16 +1552,16 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
// all following START events in the current file are without checksum
if (event_type == START_EVENT_V3)
- (const_cast< Format_description_log_event *>(description_event))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ (const_cast< Format_description_log_event *>(fdle))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
/*
CRC verification by SQL and Show-Binlog-Events master side.
- The caller has to provide @description_event->checksum_alg to
+ The caller has to provide @fdle->checksum_alg to
be the last seen FD's (A) descriptor.
If event is FD the descriptor is in it.
Notice, FD of the binlog can be only in one instance and therefore
Show-Binlog-Events executing master side thread needs just to know
the only FD's (A) value - whereas RL can contain more.
- In the RL case, the alg is kept in FD_e (@description_event) which is reset
+ In the RL case, the alg is kept in FD_e (@fdle) which is reset
to the newer read-out event after its execution with possibly new alg descriptor.
Therefore in a typical sequence of RL:
{FD_s^0, FD_m, E_m^1} E_m^1
@@ -1536,7 +1573,7 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF.
*/
alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
- description_event->checksum_alg : get_checksum_alg(buf, event_len);
+ fdle->checksum_alg : get_checksum_alg(buf, event_len);
// Emulate the corruption during reading an event
DBUG_EXECUTE_IF("corrupt_read_log_event_char",
if (event_type != FORMAT_DESCRIPTION_EVENT)
@@ -1555,29 +1592,29 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
*error= "Event crc check failed! Most likely there is event corruption.";
if (force_opt)
{
- ev= new Unknown_log_event(buf, description_event);
+ ev= new Unknown_log_event(buf, fdle);
DBUG_RETURN(ev);
}
else
DBUG_RETURN(NULL);
#else
*error= ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE);
- sql_print_error("%s", ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE));
+ sql_print_error("%s", *error);
DBUG_RETURN(NULL);
#endif
}
- if (event_type > description_event->number_of_event_types &&
+ if (event_type > fdle->number_of_event_types &&
event_type != FORMAT_DESCRIPTION_EVENT)
{
/*
- It is unsafe to use the description_event if its post_header_len
+ It is unsafe to use the fdle if its post_header_len
array does not include the event type.
*/
DBUG_PRINT("error", ("event type %d found, but the current "
"Format_description_log_event supports only %d event "
"types", event_type,
- description_event->number_of_event_types));
+ fdle->number_of_event_types));
ev= NULL;
}
else
@@ -1592,9 +1629,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
array, which was set up when the Format_description_log_event
was read.
*/
- if (description_event->event_type_permutation)
+ if (fdle->event_type_permutation)
{
- int new_event_type= description_event->event_type_permutation[event_type];
+ int new_event_type= fdle->event_type_permutation[event_type];
DBUG_PRINT("info", ("converting event type %d to %d (%s)",
event_type, new_event_type,
get_type_str((Log_event_type)new_event_type)));
@@ -1605,104 +1642,102 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
(event_type == FORMAT_DESCRIPTION_EVENT ||
alg != BINLOG_CHECKSUM_ALG_OFF))
event_len= event_len - BINLOG_CHECKSUM_LEN;
-
+
switch(event_type) {
case QUERY_EVENT:
- ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
+ ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT);
break;
case LOAD_EVENT:
- ev = new Load_log_event(buf, event_len, description_event);
+ ev = new Load_log_event(buf, event_len, fdle);
break;
case NEW_LOAD_EVENT:
- ev = new Load_log_event(buf, event_len, description_event);
+ ev = new Load_log_event(buf, event_len, fdle);
break;
case ROTATE_EVENT:
- ev = new Rotate_log_event(buf, event_len, description_event);
+ ev = new Rotate_log_event(buf, event_len, fdle);
break;
case BINLOG_CHECKPOINT_EVENT:
- ev = new Binlog_checkpoint_log_event(buf, event_len, description_event);
+ ev = new Binlog_checkpoint_log_event(buf, event_len, fdle);
break;
case GTID_EVENT:
- ev = new Gtid_log_event(buf, event_len, description_event);
+ ev = new Gtid_log_event(buf, event_len, fdle);
break;
case GTID_LIST_EVENT:
- ev = new Gtid_list_log_event(buf, event_len, description_event);
- break;
-#ifdef HAVE_REPLICATION
- case SLAVE_EVENT: /* can never happen (unused event) */
- ev = new Slave_log_event(buf, event_len, description_event);
+ ev = new Gtid_list_log_event(buf, event_len, fdle);
break;
-#endif /* HAVE_REPLICATION */
case CREATE_FILE_EVENT:
- ev = new Create_file_log_event(buf, event_len, description_event);
+ ev = new Create_file_log_event(buf, event_len, fdle);
break;
case APPEND_BLOCK_EVENT:
- ev = new Append_block_log_event(buf, event_len, description_event);
+ ev = new Append_block_log_event(buf, event_len, fdle);
break;
case DELETE_FILE_EVENT:
- ev = new Delete_file_log_event(buf, event_len, description_event);
+ ev = new Delete_file_log_event(buf, event_len, fdle);
break;
case EXEC_LOAD_EVENT:
- ev = new Execute_load_log_event(buf, event_len, description_event);
+ ev = new Execute_load_log_event(buf, event_len, fdle);
break;
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
- ev = new Start_log_event_v3(buf, event_len, description_event);
+ ev = new Start_log_event_v3(buf, event_len, fdle);
break;
case STOP_EVENT:
- ev = new Stop_log_event(buf, description_event);
+ ev = new Stop_log_event(buf, fdle);
break;
case INTVAR_EVENT:
- ev = new Intvar_log_event(buf, description_event);
+ ev = new Intvar_log_event(buf, fdle);
break;
case XID_EVENT:
- ev = new Xid_log_event(buf, description_event);
+ ev = new Xid_log_event(buf, fdle);
break;
case RAND_EVENT:
- ev = new Rand_log_event(buf, description_event);
+ ev = new Rand_log_event(buf, fdle);
break;
case USER_VAR_EVENT:
- ev = new User_var_log_event(buf, event_len, description_event);
+ ev = new User_var_log_event(buf, event_len, fdle);
break;
case FORMAT_DESCRIPTION_EVENT:
- ev = new Format_description_log_event(buf, event_len, description_event);
+ ev = new Format_description_log_event(buf, event_len, fdle);
break;
#if defined(HAVE_REPLICATION)
case PRE_GA_WRITE_ROWS_EVENT:
- ev = new Write_rows_log_event_old(buf, event_len, description_event);
+ ev = new Write_rows_log_event_old(buf, event_len, fdle);
break;
case PRE_GA_UPDATE_ROWS_EVENT:
- ev = new Update_rows_log_event_old(buf, event_len, description_event);
+ ev = new Update_rows_log_event_old(buf, event_len, fdle);
break;
case PRE_GA_DELETE_ROWS_EVENT:
- ev = new Delete_rows_log_event_old(buf, event_len, description_event);
+ ev = new Delete_rows_log_event_old(buf, event_len, fdle);
break;
case WRITE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT:
- ev = new Write_rows_log_event(buf, event_len, description_event);
+ ev = new Write_rows_log_event(buf, event_len, fdle);
break;
case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT:
- ev = new Update_rows_log_event(buf, event_len, description_event);
+ ev = new Update_rows_log_event(buf, event_len, fdle);
break;
case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT:
- ev = new Delete_rows_log_event(buf, event_len, description_event);
+ ev = new Delete_rows_log_event(buf, event_len, fdle);
break;
case TABLE_MAP_EVENT:
- ev = new Table_map_log_event(buf, event_len, description_event);
+ ev = new Table_map_log_event(buf, event_len, fdle);
break;
#endif
case BEGIN_LOAD_QUERY_EVENT:
- ev = new Begin_load_query_log_event(buf, event_len, description_event);
+ ev = new Begin_load_query_log_event(buf, event_len, fdle);
break;
case EXECUTE_LOAD_QUERY_EVENT:
- ev= new Execute_load_query_log_event(buf, event_len, description_event);
+ ev= new Execute_load_query_log_event(buf, event_len, fdle);
break;
case INCIDENT_EVENT:
- ev = new Incident_log_event(buf, event_len, description_event);
+ ev = new Incident_log_event(buf, event_len, fdle);
break;
case ANNOTATE_ROWS_EVENT:
- ev = new Annotate_rows_log_event(buf, event_len, description_event);
+ ev = new Annotate_rows_log_event(buf, event_len, fdle);
+ break;
+ case START_ENCRYPTION_EVENT:
+ ev = new Start_encryption_log_event(buf, event_len, fdle);
break;
default:
DBUG_PRINT("error",("Unknown event code: %d",
@@ -1715,14 +1750,16 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
if (ev)
{
ev->checksum_alg= alg;
+#ifdef MYSQL_CLIENT
if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
ev->crc= uint4korr(buf + (event_len));
+#endif
}
- DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)",
+ DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)",
ev ? ev->get_type_str() : "<unknown>",
- buf[EVENT_TYPE_OFFSET],
+ (uchar)buf[EVENT_TYPE_OFFSET],
event_len));
/*
is_valid() are small event-specific sanity tests which are
@@ -1746,7 +1783,7 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
*error= "Found invalid event in binary log";
DBUG_RETURN(0);
}
- ev= new Unknown_log_event(buf, description_event);
+ ev= new Unknown_log_event(buf, fdle);
#else
*error= "Found invalid event in binary log";
DBUG_RETURN(0);
@@ -2399,6 +2436,12 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr,
my_snprintf(typestr, typestr_length, "STRING(%d)", length);
return my_b_write_quoted_with_length(file, ptr, length);
+ case MYSQL_TYPE_DECIMAL:
+ my_b_printf(file,
+ "!! Old DECIMAL (mysql-4.1 or earlier). "
+ "Not enough metadata to display the value. ");
+ break;
+
default:
{
char tmp[5];
@@ -2769,7 +2812,7 @@ void Query_log_event::pack_info(THD *thd, Protocol *protocol)
/**
Utility function for the next method (Query_log_event::write()) .
*/
-static void write_str_with_code_and_len(uchar **dst, const char *src,
+static void store_str_with_code_and_len(uchar **dst, const char *src,
uint len, uint code)
{
/*
@@ -2794,7 +2837,7 @@ static void write_str_with_code_and_len(uchar **dst, const char *src,
will print!
*/
-bool Query_log_event::write(IO_CACHE* file)
+bool Query_log_event::write()
{
uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS];
uchar *start, *start_of_status;
@@ -2865,7 +2908,7 @@ bool Query_log_event::write(IO_CACHE* file)
}
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
{
- write_str_with_code_and_len(&start,
+ store_str_with_code_and_len(&start,
catalog, catalog_len, Q_CATALOG_NZ_CODE);
/*
In 5.0.x where x<4 masters we used to store the end zero here. This was
@@ -2903,7 +2946,7 @@ bool Query_log_event::write(IO_CACHE* file)
{
/* In the TZ sys table, column Name is of length 64 so this should be ok */
DBUG_ASSERT(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
- write_str_with_code_and_len(&start,
+ store_str_with_code_and_len(&start,
time_zone_str, time_zone_len, Q_TIME_ZONE_CODE);
}
if (lc_time_names_number)
@@ -3023,14 +3066,13 @@ bool Query_log_event::write(IO_CACHE* file)
*/
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
- return (write_header(file, event_length) ||
- wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
- write_post_header_for_derived(file) ||
- wrapper_my_b_safe_write(file, (uchar*) start_of_status,
- (uint) (start-start_of_status)) ||
- wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
- wrapper_my_b_safe_write(file, (uchar*) query, q_len) ||
- write_footer(file)) ? 1 : 0;
+ return write_header(event_length) ||
+ write_data(buf, QUERY_HEADER_LEN) ||
+ write_post_header_for_derived() ||
+ write_data(start_of_status, (uint) (start-start_of_status)) ||
+ write_data(safe_str(db), db_len + 1) ||
+ write_data(query, q_len) ||
+ write_footer();
}
/**
@@ -3053,6 +3095,7 @@ Query_log_event::Query_log_event()
query_arg - array of char representing the query
query_length - size of the `query_arg' array
using_trans - there is a modified transactional table
+ direct - Don't cache statement
suppress_use - suppress the generation of 'USE' statements
errcode - the error code of the query
@@ -3180,10 +3223,17 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
break;
case SQLCOM_CREATE_TABLE:
+ /*
+ If we are using CREATE ... SELECT or if we are a slave
+ executing BEGIN...COMMIT (generated by CREATE...SELECT) we
+ have to use the transactional cache to ensure we don't
+ calculate any checksum for the CREATE part.
+ */
trx_cache= (lex->select_lex.item_list.elements &&
- thd->is_current_stmt_binlog_format_row());
+ thd->is_current_stmt_binlog_format_row()) ||
+ (thd->variables.option_bits & OPTION_GTID_BEGIN);
use_cache= (lex->tmp_table() &&
- thd->in_multi_stmt_transaction_mode()) || trx_cache;
+ thd->in_multi_stmt_transaction_mode()) || trx_cache;
break;
case SQLCOM_SET_OPTION:
if (lex->autocommit)
@@ -3214,8 +3264,8 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
else
cache_type= Log_event::EVENT_STMT_CACHE;
DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE);
- DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu",
- (ulong) flags2, sql_mode));
+ DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu cache_tye: %d",
+ (ulong) flags2, sql_mode, cache_type));
}
#endif /* MYSQL_CLIENT */
@@ -3425,14 +3475,10 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
break;
case Q_SQL_MODE_CODE:
{
-#ifndef DBUG_OFF
- char buff[22];
-#endif
CHECK_SPACE(pos, end, 8);
sql_mode_inited= 1;
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is ulonglong
- DBUG_PRINT("info",("In Query_log_event, read sql_mode: %s",
- llstr(sql_mode, buff)));
+ DBUG_PRINT("info",("In Query_log_event, read sql_mode: %llu", sql_mode));
pos+= 8;
break;
}
@@ -3635,7 +3681,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
*/
int
Query_log_event::dummy_event(String *packet, ulong ev_offset,
- uint8 checksum_alg)
+ enum enum_binlog_checksum_alg checksum_alg)
{
uchar *p= (uchar *)packet->ptr() + ev_offset;
size_t data_len= packet->length() - ev_offset;
@@ -3715,7 +3761,7 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset,
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
{
- ha_checksum crc= my_checksum(0L, p, data_len);
+ ha_checksum crc= my_checksum(0, p, data_len);
int4store(p + data_len, crc);
}
return 0;
@@ -3727,7 +3773,7 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset,
*/
int
Query_log_event::begin_event(String *packet, ulong ev_offset,
- uint8 checksum_alg)
+ enum enum_binlog_checksum_alg checksum_alg)
{
uchar *p= (uchar *)packet->ptr() + ev_offset;
uchar *q= p + LOG_EVENT_HEADER_LEN;
@@ -3780,7 +3826,7 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
{
- ha_checksum crc= my_checksum(0L, p, data_len);
+ ha_checksum crc= my_checksum(0, p, data_len);
int4store(p + data_len, crc);
}
return 0;
@@ -4228,7 +4274,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
"mysql", rpl_gtid_slave_state_table_name.str,
errcode,
thd->get_stmt_da()->message());
- trans_rollback(thd);
sub_id= 0;
thd->is_slave_error= 1;
goto end;
@@ -4349,15 +4394,15 @@ compare_errors:
!ignored_error_code(expected_error))
{
rli->report(ERROR_LEVEL, 0, rgi->gtid_info(),
- "Query caused different errors on master and slave. "
- "Error on master: message (format)='%s' error code=%d ; "
- "Error on slave: actual message='%s', error code=%d. "
- "Default database: '%s'. Query: '%s'",
- ER_SAFE(expected_error),
- expected_error,
- actual_error ? thd->get_stmt_da()->message() : "no error",
- actual_error,
- print_slave_db_safe(db), query_arg);
+ "Query caused different errors on master and slave. "
+ "Error on master: message (format)='%s' error code=%d ; "
+ "Error on slave: actual message='%s', error code=%d. "
+ "Default database: '%s'. Query: '%s'",
+ ER_SAFE_THD(thd, expected_error),
+ expected_error,
+ actual_error ? thd->get_stmt_da()->message() : "no error",
+ actual_error,
+ print_slave_db_safe(db), query_arg);
thd->is_slave_error= 1;
}
/*
@@ -4464,12 +4509,6 @@ end:
DBUG_RETURN(thd->is_slave_error);
}
-int Query_log_event::do_update_pos(rpl_group_info *rgi)
-{
- return Log_event::do_update_pos(rgi);
-}
-
-
Log_event::enum_skip_reason
Query_log_event::do_shall_skip(rpl_group_info *rgi)
{
@@ -4523,7 +4562,8 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
bool
Query_log_event::peek_is_commit_rollback(const char *event_start,
- size_t event_len, uint8 checksum_alg)
+ size_t event_len,
+ enum enum_binlog_checksum_alg checksum_alg)
{
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
{
@@ -4657,7 +4697,7 @@ Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len,
*/
#ifndef MYSQL_CLIENT
-bool Start_log_event_v3::write(IO_CACHE* file)
+bool Start_log_event_v3::write()
{
char buff[START_V3_HEADER_LEN];
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
@@ -4665,9 +4705,9 @@ bool Start_log_event_v3::write(IO_CACHE* file)
if (!dont_set_created)
created= get_time(); // this sets when and when_sec_part as a side effect
int4store(buff + ST_CREATED_OFFSET,created);
- return (write_header(file, sizeof(buff)) ||
- wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) ||
- write_footer(file));
+ return write_header(sizeof(buff)) ||
+ write_data(buff, sizeof(buff)) ||
+ write_footer();
}
#endif
@@ -4864,6 +4904,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
BINLOG_CHECKPOINT_HEADER_LEN;
post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN;
post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
+ post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN;
// Sanity-check that all post header lengths are initialized.
int i;
@@ -4917,7 +4958,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
break;
}
calc_server_version_split();
- checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
+ checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ reset_crypto();
}
@@ -4969,18 +5011,19 @@ Format_description_log_event(const char* buf,
{
/* the last bytes are the checksum alg desc and value (or value's room) */
number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
- checksum_alg= post_header_len[number_of_event_types];
+ checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types];
}
else
{
- checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
+ checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
}
+ reset_crypto();
DBUG_VOID_RETURN;
}
#ifndef MYSQL_CLIENT
-bool Format_description_log_event::write(IO_CACHE* file)
+bool Format_description_log_event::write()
{
bool ret;
bool no_checksum;
@@ -5005,12 +5048,12 @@ bool Format_description_log_event::write(IO_CACHE* file)
slave does it via marking the event according to
FD_queue checksum_alg value.
*/
- compile_time_assert(sizeof(BINLOG_CHECKSUM_ALG_DESC_LEN == 1));
+ compile_time_assert(BINLOG_CHECKSUM_ALG_DESC_LEN == 1);
#ifndef DBUG_OFF
data_written= 0; // to prepare for need_checksum assert
#endif
- uchar checksum_byte= need_checksum() ?
- checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
+ uint8 checksum_byte= (uint8)
+ (need_checksum() ? checksum_alg : BINLOG_CHECKSUM_ALG_OFF);
/*
FD of checksum-aware server is always checksum-equipped, (V) is in,
regardless of @@global.binlog_checksum policy.
@@ -5028,12 +5071,11 @@ bool Format_description_log_event::write(IO_CACHE* file)
{
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
}
- ret= (write_header(file, rec_size) ||
- wrapper_my_b_safe_write(file, buff, sizeof(buff)) ||
- wrapper_my_b_safe_write(file, (uchar*)post_header_len,
- number_of_event_types) ||
- wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) ||
- write_footer(file));
+ ret= write_header(rec_size) ||
+ write_data(buff, sizeof(buff)) ||
+ write_data(post_header_len, number_of_event_types) ||
+ write_data(&checksum_byte, sizeof(checksum_byte)) ||
+ write_footer();
if (no_checksum)
checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
return ret;
@@ -5044,7 +5086,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
{
int ret= 0;
- Relay_log_info const *rli= rgi->rli;
+ Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Format_description_log_event::do_apply_event");
/*
@@ -5091,8 +5133,9 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
if (!ret)
{
/* Save the information describing this binlog */
+ copy_crypto_data(rli->relay_log.description_event_for_exec);
delete rli->relay_log.description_event_for_exec;
- const_cast<Relay_log_info *>(rli)->relay_log.description_event_for_exec= this;
+ rli->relay_log.description_event_for_exec= this;
}
DBUG_RETURN(ret);
@@ -5132,6 +5175,17 @@ Format_description_log_event::do_shall_skip(rpl_group_info *rgi)
#endif
+bool Format_description_log_event::start_decryption(Start_encryption_log_event* sele)
+{
+ DBUG_ASSERT(crypto_data.scheme == 0);
+
+ if (!sele->is_valid())
+ return 1;
+
+ memcpy(crypto_data.nonce, sele->nonce, BINLOG_NONCE_LENGTH);
+ return crypto_data.init(sele->crypto_scheme, sele->key_version);
+}
+
static inline void
do_server_version_split(char* version,
Format_description_log_event::master_version_split *split_versions)
@@ -5215,9 +5269,9 @@ Format_description_log_event::is_version_before_checksum(const master_version_sp
checksum-unaware (effectively no checksum) and the actuall
[1-254] range alg descriptor.
*/
-uint8 get_checksum_alg(const char* buf, ulong len)
+enum enum_binlog_checksum_alg get_checksum_alg(const char* buf, ulong len)
{
- uint8 ret;
+ enum enum_binlog_checksum_alg ret;
char version[ST_SERVER_VER_LEN];
Format_description_log_event::master_version_split version_split;
@@ -5230,16 +5284,68 @@ uint8 get_checksum_alg(const char* buf, ulong len)
version[ST_SERVER_VER_LEN - 1]= 0;
do_server_version_split(version, &version_split);
- ret= Format_description_log_event::is_version_before_checksum(&version_split) ?
- (uint8) BINLOG_CHECKSUM_ALG_UNDEF :
- * (uint8*) (buf + len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN);
+ ret= Format_description_log_event::is_version_before_checksum(&version_split)
+ ? BINLOG_CHECKSUM_ALG_UNDEF
+ : (enum_binlog_checksum_alg)buf[len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN];
DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF ||
ret == BINLOG_CHECKSUM_ALG_UNDEF ||
ret == BINLOG_CHECKSUM_ALG_CRC32);
DBUG_RETURN(ret);
}
-
+Start_encryption_log_event::Start_encryption_log_event(
+ const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ if ((int)event_len ==
+ LOG_EVENT_MINIMAL_HEADER_LEN + Start_encryption_log_event::get_data_size())
+ {
+ buf += LOG_EVENT_MINIMAL_HEADER_LEN;
+ crypto_scheme = *(uchar*)buf;
+ key_version = uint4korr(buf + BINLOG_CRYPTO_SCHEME_LENGTH);
+ memcpy(nonce,
+ buf + BINLOG_CRYPTO_SCHEME_LENGTH + BINLOG_KEY_VERSION_LENGTH,
+ BINLOG_NONCE_LENGTH);
+ }
+ else
+ crypto_scheme= ~0; // invalid
+}
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+int Start_encryption_log_event::do_apply_event(rpl_group_info* rgi)
+{
+ return rgi->rli->relay_log.description_event_for_exec->start_decryption(this);
+}
+
+int Start_encryption_log_event::do_update_pos(rpl_group_info *rgi)
+{
+ /*
+ master never sends Start_encryption_log_event, any SELE that a slave
+ might see was created locally in MYSQL_BIN_LOG::open() on the slave
+ */
+ rgi->inc_event_relay_log_pos();
+ return 0;
+}
+
+#endif
+
+#ifndef MYSQL_SERVER
+void Start_encryption_log_event::print(FILE* file,
+ PRINT_EVENT_INFO* print_event_info)
+{
+ Write_on_release_cache cache(&print_event_info->head_cache, file);
+ StringBuffer<1024> buf;
+ buf.append(STRING_WITH_LEN("# Encryption scheme: "));
+ buf.append_ulonglong(crypto_scheme);
+ buf.append(STRING_WITH_LEN(", key_version: "));
+ buf.append_ulonglong(key_version);
+ buf.append(STRING_WITH_LEN(", nonce: "));
+ buf.append_hex(nonce, BINLOG_NONCE_LENGTH);
+ buf.append(STRING_WITH_LEN("\n# The rest of the binlog is encrypted!\n"));
+ my_b_write(&cache, (uchar*)buf.ptr(), buf.length());
+}
+#endif
/**************************************************************************
Load_log_event methods
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
@@ -5379,7 +5485,7 @@ void Load_log_event::pack_info(THD *thd, Protocol *protocol)
Load_log_event::write_data_header()
*/
-bool Load_log_event::write_data_header(IO_CACHE* file)
+bool Load_log_event::write_data_header()
{
char buf[LOAD_HEADER_LEN];
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
@@ -5388,7 +5494,7 @@ bool Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
- return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0;
+ return write_data(buf, LOAD_HEADER_LEN) != 0;
}
@@ -5396,19 +5502,19 @@ bool Load_log_event::write_data_header(IO_CACHE* file)
Load_log_event::write_data_body()
*/
-bool Load_log_event::write_data_body(IO_CACHE* file)
+bool Load_log_event::write_data_body()
{
- if (sql_ex.write_data(file))
+ if (sql_ex.write_data(writer))
return 1;
if (num_fields && fields && field_lens)
{
- if (my_b_safe_write(file, (uchar*)field_lens, num_fields) ||
- my_b_safe_write(file, (uchar*)fields, field_block_len))
+ if (write_data(field_lens, num_fields) ||
+ write_data(fields, field_block_len))
return 1;
}
- return (my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) ||
- my_b_safe_write(file, (uchar*)db, db_len + 1) ||
- my_b_safe_write(file, (uchar*)fname, fname_len));
+ return (write_data(table_name, table_name_len + 1) ||
+ write_data(db, db_len + 1) ||
+ write_data(fname, fname_len));
}
@@ -5719,8 +5825,10 @@ void Load_log_event::set_fields(const char* affected_db,
const char* field = fields;
for (i= 0; i < num_fields; i++)
{
- field_list.push_back(new Item_field(context,
- affected_db, table_name, field));
+ field_list.push_back(new (thd->mem_root)
+ Item_field(thd, context, affected_db, table_name,
+ field),
+ thd->mem_root);
field+= field_lens[i] + 1;
}
}
@@ -5828,7 +5936,6 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
}
else
{
- char llbuff[22];
enum enum_duplicates handle_dup;
bool ignore= 0;
char query_buffer[1024];
@@ -5929,10 +6036,9 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
{
/* log_pos is the position of the LOAD event in the master log */
sql_print_warning("Slave: load data infile on table '%s' at "
- "log position %s in log '%s' produced %ld "
+ "log position %llu in log '%s' produced %ld "
"warning(s). Default database: '%s'",
- (char*) table_name,
- llstr(log_pos,llbuff), RPL_LOG_NAME,
+ (char*) table_name, log_pos, RPL_LOG_NAME,
(ulong) thd->cuted_fields,
print_slave_db_safe(thd->db));
}
@@ -6000,7 +6106,7 @@ error:
else
{
sql_errno=ER_UNKNOWN_ERROR;
- err=ER(sql_errno);
+ err= ER_THD(thd, sql_errno);
}
rli->report(ERROR_LEVEL, sql_errno, rgi->gtid_info(), "\
Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
@@ -6020,7 +6126,7 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
print_slave_db_safe(remember_db));
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
- ER(ER_SLAVE_FATAL_ERROR), buf);
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR), buf);
DBUG_RETURN(1);
}
@@ -6040,12 +6146,11 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Rotate_log_event::pack_info(THD *thd, Protocol *protocol)
{
- char buf1[256], buf[22];
- String tmp(buf1, sizeof(buf1), log_cs);
+ StringBuffer<256> tmp(log_cs);
tmp.length(0);
tmp.append(new_log_ident, ident_len);
tmp.append(STRING_WITH_LEN(";pos="));
- tmp.append(llstr(pos,buf));
+ tmp.append_ulonglong(pos);
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
}
#endif
@@ -6087,12 +6192,9 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
{
-#ifndef DBUG_OFF
- char buff[22];
DBUG_ENTER("Rotate_log_event::Rotate_log_event(...,flags)");
- DBUG_PRINT("enter",("new_log_ident: %s pos: %s flags: %lu", new_log_ident_arg,
- llstr(pos_arg, buff), (ulong) flags));
-#endif
+ DBUG_PRINT("enter",("new_log_ident: %s pos: %llu flags: %lu", new_log_ident_arg,
+ pos_arg, (ulong) flags));
cache_type= EVENT_NO_CACHE;
if (flags & DUP_NAME)
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
@@ -6129,15 +6231,14 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
*/
#ifndef MYSQL_CLIENT
-bool Rotate_log_event::write(IO_CACHE* file)
+bool Rotate_log_event::write()
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
- return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
- wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, (uchar*) new_log_ident,
- (uint) ident_len) ||
- write_footer(file));
+ return (write_header(ROTATE_HEADER_LEN + ident_len) ||
+ write_data(buf, ROTATE_HEADER_LEN) ||
+ write_data(new_log_ident, (uint) ident_len) ||
+ write_footer());
}
#endif
@@ -6161,14 +6262,11 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rotate_log_event::do_update_pos");
-#ifndef DBUG_OFF
- char buf[32];
-#endif
DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu",
(ulong) this->server_id, (ulong) global_system_variables.server_id));
DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
- DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
+ DBUG_PRINT("info", ("pos: %llu", this->pos));
/*
If we are in a transaction or in a group: the only normal case is
@@ -6330,15 +6428,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
#ifndef MYSQL_CLIENT
-bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
+bool Binlog_checkpoint_log_event::write()
{
uchar buf[BINLOG_CHECKPOINT_HEADER_LEN];
int4store(buf, binlog_file_len);
- return write_header(file, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
- wrapper_my_b_safe_write(file, buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, (const uchar *)binlog_file_name,
- binlog_file_len) ||
- write_footer(file);
+ return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
+ write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
+ write_data(binlog_file_name, binlog_file_len) ||
+ write_footer();
}
#endif /* MYSQL_CLIENT */
@@ -6409,7 +6506,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
*/
bool
Gtid_log_event::peek(const char *event_start, size_t event_len,
- uint8 checksum_alg,
+ enum enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev)
{
@@ -6440,7 +6537,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
bool
-Gtid_log_event::write(IO_CACHE *file)
+Gtid_log_event::write()
{
uchar buf[GTID_HEADER_LEN+2];
size_t write_len;
@@ -6458,9 +6555,9 @@ Gtid_log_event::write(IO_CACHE *file)
bzero(buf+13, GTID_HEADER_LEN-13);
write_len= GTID_HEADER_LEN;
}
- return write_header(file, write_len) ||
- wrapper_my_b_safe_write(file, buf, write_len) ||
- write_footer(file);
+ return write_header(write_len) ||
+ write_data(buf, write_len) ||
+ write_footer();
}
@@ -6475,7 +6572,8 @@ Gtid_log_event::write(IO_CACHE *file)
*/
int
Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
- ulong ev_offset, uint8 checksum_alg)
+ ulong ev_offset,
+ enum enum_binlog_checksum_alg checksum_alg)
{
uchar flags2;
if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
@@ -6817,7 +6915,7 @@ Gtid_list_log_event::to_packet(String *packet)
bool
-Gtid_list_log_event::write(IO_CACHE *file)
+Gtid_list_log_event::write()
{
char buf[128];
String packet(buf, sizeof(buf), system_charset_info);
@@ -6825,10 +6923,9 @@ Gtid_list_log_event::write(IO_CACHE *file)
packet.length(0);
if (to_packet(&packet))
return true;
- return
- write_header(file, get_data_size()) ||
- wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) ||
- write_footer(file);
+ return write_header(get_data_size()) ||
+ write_data(packet.ptr(), packet.length()) ||
+ write_footer();
}
@@ -6930,7 +7027,7 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
*/
bool
Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
- uint8 checksum_alg,
+ enum enum_binlog_checksum_alg checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len,
const Format_description_log_event *fdev)
{
@@ -7033,14 +7130,14 @@ const char* Intvar_log_event::get_var_type_name()
*/
#ifndef MYSQL_CLIENT
-bool Intvar_log_event::write(IO_CACHE* file)
+bool Intvar_log_event::write()
{
uchar buf[9];
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -7163,14 +7260,14 @@ Rand_log_event::Rand_log_event(const char* buf,
#ifndef MYSQL_CLIENT
-bool Rand_log_event::write(IO_CACHE* file)
+bool Rand_log_event::write()
{
uchar buf[16];
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -7289,12 +7386,12 @@ Xid_log_event(const char* buf,
#ifndef MYSQL_CLIENT
-bool Xid_log_event::write(IO_CACHE* file)
+bool Xid_log_event::write()
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
- return (write_header(file, sizeof(xid)) ||
- wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) ||
- write_footer(file));
+ return write_header(sizeof(xid)) ||
+ write_data((uchar*)&xid, sizeof(xid)) ||
+ write_footer();
}
#endif
@@ -7359,7 +7456,6 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str, ec,
thd->get_stmt_da()->message());
- trans_rollback(thd);
thd->is_slave_error= 1;
return err;
}
@@ -7641,7 +7737,7 @@ err:
#ifndef MYSQL_CLIENT
-bool User_var_log_event::write(IO_CACHE* file)
+bool User_var_log_event::write()
{
char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
@@ -7696,13 +7792,13 @@ bool User_var_log_event::write(IO_CACHE* file)
/* Length of the whole event */
event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
- return (write_header(file, event_length) ||
- wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, (uchar*) name, name_len) ||
- wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- wrapper_my_b_safe_write(file, pos, val_len) ||
- wrapper_my_b_safe_write(file, &flags, unsigned_len) ||
- write_footer(file));
+ return write_header(event_length) ||
+ write_data(buf, sizeof(buf)) ||
+ write_data(name, name_len) ||
+ write_data(buf1, buf1_length) ||
+ write_data(pos, val_len) ||
+ write_data(&flags, unsigned_len) ||
+ write_footer();
}
#endif
@@ -7849,33 +7945,33 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
if (is_null)
{
- it= new Item_null();
+ it= new (thd->mem_root) Item_null(thd);
}
else
{
switch (type) {
case REAL_RESULT:
float8get(real_val, val);
- it= new Item_float(real_val, 0);
+ it= new (thd->mem_root) Item_float(thd, real_val, 0);
val= (char*) &real_val; // Pointer to value in native format
val_len= 8;
break;
case INT_RESULT:
int_val= (longlong) uint8korr(val);
- it= new Item_int(int_val);
+ it= new (thd->mem_root) Item_int(thd, int_val);
val= (char*) &int_val; // Pointer to value in native format
val_len= 8;
break;
case DECIMAL_RESULT:
{
- Item_decimal *dec= new Item_decimal((uchar*) val+2, val[0], val[1]);
+ Item_decimal *dec= new (thd->mem_root) Item_decimal(thd, (uchar*) val+2, val[0], val[1]);
it= dec;
val= (char *)dec->val_decimal(NULL);
val_len= sizeof(my_decimal);
break;
}
case STRING_RESULT:
- it= new Item_string(val, val_len, charset);
+ it= new (thd->mem_root) Item_string(thd, val, val_len, charset);
break;
case ROW_RESULT:
default:
@@ -7884,7 +7980,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
}
}
- Item_func_set_user_var *e= new Item_func_set_user_var(user_var_name, it);
+ Item_func_set_user_var *e= new (thd->mem_root) Item_func_set_user_var(thd, user_var_name, it);
/*
Item_func_set_user_var can't substitute something else on its place =>
0 can be passed as last argument (reference on item)
@@ -7932,11 +8028,6 @@ User_var_log_event::do_shall_skip(rpl_group_info *rgi)
}
#endif /* !MYSQL_CLIENT */
-
-/**************************************************************************
- Slave_log_event methods
-**************************************************************************/
-
#ifdef HAVE_REPLICATION
#ifdef MYSQL_CLIENT
void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info)
@@ -7950,152 +8041,6 @@ void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info
}
#endif
-#ifndef MYSQL_CLIENT
-void Slave_log_event::pack_info(THD *thd, Protocol *protocol)
-{
- char buf[256+HOSTNAME_LENGTH], *pos;
- pos= strmov(buf, "host=");
- pos= strnmov(pos, master_host, HOSTNAME_LENGTH);
- pos= strmov(pos, ",port=");
- pos= int10_to_str((long) master_port, pos, 10);
- pos= strmov(pos, ",log=");
- pos= strmov(pos, master_log);
- pos= strmov(pos, ",pos=");
- pos= longlong10_to_str(master_pos, pos, 10);
- protocol->store(buf, pos-buf, &my_charset_bin);
-}
-#endif /* !MYSQL_CLIENT */
-
-
-#ifndef MYSQL_CLIENT
-/**
- @todo
- re-write this better without holding both locks at the same time
-*/
-Slave_log_event::Slave_log_event(THD* thd_arg,
- Relay_log_info* rli)
- :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
-{
- DBUG_ENTER("Slave_log_event");
- if (!rli->inited) // QQ When can this happen ?
- DBUG_VOID_RETURN;
-
- Master_info* mi = rli->mi;
- // TODO: re-write this better without holding both locks at the same time
- mysql_mutex_lock(&mi->data_lock);
- mysql_mutex_lock(&rli->data_lock);
- master_host_len = strlen(mi->host);
- master_log_len = strlen(rli->group_master_log_name);
- // on OOM, just do not initialize the structure and print the error
- if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
- MYF(MY_WME))))
- {
- master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
- memcpy(master_host, mi->host, master_host_len + 1);
- master_log = master_host + master_host_len + 1;
- memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
- master_port = mi->port;
- master_pos = rli->group_master_log_pos;
- DBUG_PRINT("info", ("master_log: %s pos: %lu", master_log,
- (ulong) master_pos));
- }
- else
- sql_print_error("Out of memory while recording slave event");
- mysql_mutex_unlock(&rli->data_lock);
- mysql_mutex_unlock(&mi->data_lock);
- DBUG_VOID_RETURN;
-}
-#endif /* !MYSQL_CLIENT */
-
-
-Slave_log_event::~Slave_log_event()
-{
- my_free(mem_pool);
-}
-
-
-#ifdef MYSQL_CLIENT
-void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
-{
- Write_on_release_cache cache(&print_event_info->head_cache, file);
-
- char llbuff[22];
- if (print_event_info->short_form)
- return;
- print_header(&cache, print_event_info, FALSE);
- my_b_printf(&cache, "\n\
-Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n",
- master_host, master_port, master_log, llstr(master_pos, llbuff));
-}
-#endif /* MYSQL_CLIENT */
-
-
-int Slave_log_event::get_data_size()
-{
- return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
-}
-
-
-#ifndef MYSQL_CLIENT
-bool Slave_log_event::write(IO_CACHE* file)
-{
- ulong event_length= get_data_size();
- int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
- int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
- // log and host are already there
-
- return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) mem_pool, event_length));
-}
-#endif
-
-
-void Slave_log_event::init_from_mem_pool(int data_size)
-{
- master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
- master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
- master_host = mem_pool + SL_MASTER_HOST_OFFSET;
- master_host_len = (uint) strlen(master_host);
- // safety
- master_log = master_host + master_host_len + 1;
- if (master_log > mem_pool + data_size)
- {
- master_host = 0;
- return;
- }
- master_log_len = (uint) strlen(master_log);
-}
-
-
-/** This code is not used, so has not been updated to be format-tolerant. */
-/* We are using description_event so that slave does not crash on Log_event
- constructor */
-Slave_log_event::Slave_log_event(const char* buf,
- uint event_len,
- const Format_description_log_event* description_event)
- :Log_event(buf,description_event),mem_pool(0),master_host(0)
-{
- if (event_len < LOG_EVENT_HEADER_LEN)
- return;
- event_len -= LOG_EVENT_HEADER_LEN;
- if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
- return;
- memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
- mem_pool[event_len] = 0;
- init_from_mem_pool(event_len);
-}
-
-
-#ifndef MYSQL_CLIENT
-int Slave_log_event::do_apply_event(rpl_group_info *rgi)
-{
- if (mysql_bin_log.is_open())
- return mysql_bin_log.write(this);
- return 0;
-}
-#endif /* !MYSQL_CLIENT */
-
-
/**************************************************************************
Stop_log_event methods
**************************************************************************/
@@ -8191,13 +8136,13 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex,
Create_file_log_event::write_data_body()
*/
-bool Create_file_log_event::write_data_body(IO_CACHE* file)
+bool Create_file_log_event::write_data_body()
{
bool res;
- if ((res= Load_log_event::write_data_body(file)) || fake_base)
+ if ((res= Load_log_event::write_data_body()) || fake_base)
return res;
- return (my_b_safe_write(file, (uchar*) "", 1) ||
- my_b_safe_write(file, (uchar*) block, block_len));
+ return write_data("", 1) ||
+ write_data(block, block_len);
}
@@ -8205,14 +8150,14 @@ bool Create_file_log_event::write_data_body(IO_CACHE* file)
Create_file_log_event::write_data_header()
*/
-bool Create_file_log_event::write_data_header(IO_CACHE* file)
+bool Create_file_log_event::write_data_header()
{
bool res;
uchar buf[CREATE_FILE_HEADER_LEN];
- if ((res= Load_log_event::write_data_header(file)) || fake_base)
+ if ((res= Load_log_event::write_data_header()) || fake_base)
return res;
int4store(buf + CF_FILE_ID_OFFSET, file_id);
- return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
+ return write_data(buf, CREATE_FILE_HEADER_LEN) != 0;
}
@@ -8220,11 +8165,11 @@ bool Create_file_log_event::write_data_header(IO_CACHE* file)
Create_file_log_event::write_base()
*/
-bool Create_file_log_event::write_base(IO_CACHE* file)
+bool Create_file_log_event::write_base()
{
bool res;
fake_base= 1; // pretend we are Load event
- res= write(file);
+ res= write();
fake_base= 0;
return res;
}
@@ -8371,6 +8316,7 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
char *ext;
int fd = -1;
IO_CACHE file;
+ Log_event_writer lew(&file);
int error = 1;
Relay_log_info const *rli= rgi->rli;
@@ -8396,7 +8342,8 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
// a trick to avoid allocating another buffer
fname= fname_buf;
fname_len= (uint) (strmov(ext, ".data") - fname);
- if (write_base(&file))
+ writer= &lew;
+ if (write_base())
{
strmov(ext, ".info"); // to have it right in the error message
rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
@@ -8487,14 +8434,14 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len,
*/
#ifndef MYSQL_CLIENT
-bool Append_block_log_event::write(IO_CACHE* file)
+bool Append_block_log_event::write()
{
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
- return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
- wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, (uchar*) block, block_len) ||
- write_footer(file));
+ return write_header(APPEND_BLOCK_HEADER_LEN + block_len) ||
+ write_data(buf, APPEND_BLOCK_HEADER_LEN) ||
+ write_data(block, block_len) ||
+ write_footer();
}
#endif
@@ -8647,13 +8594,13 @@ Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
*/
#ifndef MYSQL_CLIENT
-bool Delete_file_log_event::write(IO_CACHE* file)
+bool Delete_file_log_event::write()
{
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -8747,13 +8694,13 @@ Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
*/
#ifndef MYSQL_CLIENT
-bool Execute_load_log_event::write(IO_CACHE* file)
+bool Execute_load_log_event::write()
{
uchar buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -8982,14 +8929,14 @@ ulong Execute_load_query_log_event::get_post_header_size_for_derived()
#ifndef MYSQL_CLIENT
bool
-Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
+Execute_load_query_log_event::write_post_header_for_derived()
{
uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
int4store(buf, file_id);
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
- return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+ return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
#endif
@@ -9084,7 +9031,7 @@ Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi)
if (buf == NULL)
{
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
- ER(ER_SLAVE_FATAL_ERROR), "Not enough memory");
+ ER_THD(rgi->thd, ER_SLAVE_FATAL_ERROR), "Not enough memory");
return 1;
}
@@ -9133,40 +9080,6 @@ Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi)
**************************************************************************/
/*
- sql_ex_info::write_data()
-*/
-
-bool sql_ex_info::write_data(IO_CACHE* file)
-{
- if (new_format())
- {
- return (write_str(file, field_term, (uint) field_term_len) ||
- write_str(file, enclosed, (uint) enclosed_len) ||
- write_str(file, line_term, (uint) line_term_len) ||
- write_str(file, line_start, (uint) line_start_len) ||
- write_str(file, escaped, (uint) escaped_len) ||
- my_b_safe_write(file,(uchar*) &opt_flags,1));
- }
- else
- {
- /**
- @todo This is sensitive to field padding. We should write a
- char[7], not an old_sql_ex. /sven
- */
- old_sql_ex old_ex;
- old_ex.field_term= *field_term;
- old_ex.enclosed= *enclosed;
- old_ex.line_term= *line_term;
- old_ex.line_start= *line_start;
- old_ex.escaped= *escaped;
- old_ex.opt_flags= opt_flags;
- old_ex.empty_flags=empty_flags;
- return my_b_safe_write(file, (uchar*) &old_ex, sizeof(old_ex)) != 0;
- }
-}
-
-
-/*
sql_ex_info::init()
*/
@@ -9216,12 +9129,54 @@ const char *sql_ex_info::init(const char *buf, const char *buf_end,
return buf;
}
+#ifndef MYSQL_CLIENT
+/*
+ write_str()
+*/
+
+static bool write_str(Log_event_writer *writer, const char *str, uint length)
+{
+ uchar tmp[1];
+ tmp[0]= (uchar) length;
+ return (writer->write_data(tmp, sizeof(tmp)) ||
+ writer->write_data((uchar*) str, length));
+}
+
+/*
+ sql_ex_info::write_data()
+*/
+
+bool sql_ex_info::write_data(Log_event_writer *writer)
+{
+ if (new_format())
+ {
+ return write_str(writer, field_term, field_term_len) ||
+ write_str(writer, enclosed, enclosed_len) ||
+ write_str(writer, line_term, line_term_len) ||
+ write_str(writer, line_start, line_start_len) ||
+ write_str(writer, escaped, escaped_len) ||
+ writer->write_data((uchar*) &opt_flags, 1);
+ }
+ else
+ {
+ uchar old_ex[7];
+ old_ex[0]= *field_term;
+ old_ex[1]= *enclosed;
+ old_ex[2]= *line_term;
+ old_ex[3]= *line_start;
+ old_ex[4]= *escaped;
+ old_ex[5]= opt_flags;
+ old_ex[6]= empty_flags;
+ return writer->write_data(old_ex, sizeof(old_ex));
+ }
+}
+
+
/**************************************************************************
Rows_log_event member functions
**************************************************************************/
-#ifndef MYSQL_CLIENT
Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
MY_BITMAP const *cols, bool is_transactional,
Log_event_type event_type)
@@ -9493,6 +9448,18 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length)
DBUG_ENTER("Rows_log_event::do_add_row_data");
DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data,
(ulong) length));
+
+ /*
+ If length is zero, there is nothing to write, so we just
+ return. Note that this is not an optimization, since calling
+ realloc() with size 0 means free().
+ */
+ if (length == 0)
+ {
+ m_row_count++;
+ DBUG_RETURN(0);
+ }
+
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@@ -9849,12 +9816,19 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
longer if slave has extra columns.
*/
- DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
+ DBUG_PRINT_BITSET("debug", "Setting table's read_set from: %s", &m_cols);
bitmap_set_all(table->read_set);
+ if (get_general_type_code() == DELETE_ROWS_EVENT ||
+ get_general_type_code() == UPDATE_ROWS_EVENT)
+ bitmap_intersect(table->read_set,&m_cols);
+
bitmap_set_all(table->write_set);
- if (!get_flags(COMPLETE_ROWS_F))
- bitmap_intersect(table->write_set,&m_cols);
+
+ /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
+ MY_BITMAP *after_image= ((get_general_type_code() == UPDATE_ROWS_EVENT) ?
+ &m_cols_ai : &m_cols);
+ bitmap_intersect(table->write_set, after_image);
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
@@ -9882,7 +9856,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
rgi->set_row_stmt_start_timestamp();
THD_STAGE_INFO(thd, stage_executing);
- while (error == 0 && m_curr_row < m_rows_end)
+ do
{
/* in_use can have been set to NULL in close_tables_for_reopen */
THD* old_thd= table->in_use;
@@ -9930,18 +9904,14 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (!m_curr_row_end && !error)
error= unpack_current_row(rgi);
-
- // at this moment m_curr_row_end should be set
- DBUG_ASSERT(error || m_curr_row_end != NULL);
- DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
- DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
-
+
m_curr_row= m_curr_row_end;
if (error == 0 && !transactional_table)
thd->transaction.all.modified_non_trans_table=
thd->transaction.stmt.modified_non_trans_table= TRUE;
} // row processing loop
+ while (error == 0 && (m_curr_row != m_rows_end));
/*
Restore the sql_mode after the rows event is processed.
@@ -10155,7 +10125,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT
-bool Rows_log_event::write_data_header(IO_CACHE *file)
+bool Rows_log_event::write_data_header()
{
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
DBUG_ASSERT(m_table_id != ~0UL);
@@ -10163,14 +10133,14 @@ bool Rows_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (wrapper_my_b_safe_write(file, buf, 6));
+ return (write_data(buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+ return write_data(buf, ROWS_HEADER_LEN);
}
-bool Rows_log_event::write_data_body(IO_CACHE*file)
+bool Rows_log_event::write_data_body()
{
/*
Note that this should be the number of *bits*, not the number of
@@ -10183,11 +10153,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
- res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap,
- no_bytes_in_map(&m_cols));
+ res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
@@ -10195,11 +10164,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
{
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
- res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
- no_bytes_in_map(&m_cols_ai));
+ res= res || write_data((uchar*)m_cols_ai.bitmap,
+ no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ res= res || write_data(m_rows_buf, (size_t) data_size);
return res;
@@ -10297,16 +10266,16 @@ bool Annotate_rows_log_event::is_valid() const
}
#ifndef MYSQL_CLIENT
-bool Annotate_rows_log_event::write_data_header(IO_CACHE *file)
+bool Annotate_rows_log_event::write_data_header()
{
return 0;
}
#endif
#ifndef MYSQL_CLIENT
-bool Annotate_rows_log_event::write_data_body(IO_CACHE *file)
+bool Annotate_rows_log_event::write_data_body()
{
- return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
+ return write_data(m_query_txt, m_query_len);
}
#endif
@@ -10980,13 +10949,14 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
if (thd->slave_thread)
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
- ER(ER_SLAVE_FATAL_ERROR), buf);
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR), buf);
else
/*
For the cases in which a 'BINLOG' statement is set to
execute in a user session
*/
- my_printf_error(ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
+ my_printf_error(ER_SLAVE_FATAL_ERROR,
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
MYF(0), buf);
}
@@ -11015,7 +10985,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT
-bool Table_map_log_event::write_data_header(IO_CACHE *file)
+bool Table_map_log_event::write_data_header()
{
DBUG_ASSERT(m_table_id != ~0UL);
uchar buf[TABLE_MAP_HEADER_LEN];
@@ -11023,14 +10993,14 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (wrapper_my_b_safe_write(file, buf, 6));
+ return (write_data(buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+ return write_data(buf, TABLE_MAP_HEADER_LEN);
}
-bool Table_map_log_event::write_data_body(IO_CACHE *file)
+bool Table_map_log_event::write_data_body()
{
DBUG_ASSERT(m_dbnam != NULL);
DBUG_ASSERT(m_tblnam != NULL);
@@ -11051,15 +11021,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
- wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
- wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
- wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
- wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
- wrapper_my_b_safe_write(file, m_coltype, m_colcnt) ||
- wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
- wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
- wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ return write_data(dbuf, sizeof(dbuf)) ||
+ write_data(m_dbnam, m_dblen+1) ||
+ write_data(tbuf, sizeof(tbuf)) ||
+ write_data(m_tblnam, m_tbllen+1) ||
+ write_data(cbuf, (size_t) (cbuf_end - cbuf)) ||
+ write_data(m_coltype, m_colcnt) ||
+ write_data(mbuf, (size_t) (mbuf_end - mbuf)) ||
+ write_data(m_field_metadata, m_field_metadata_size),
+ write_data(m_null_bits, (m_colcnt + 7) / 8);
}
#endif
@@ -11111,9 +11081,9 @@ void Table_map_log_event::print(FILE *, PRINT_EVENT_INFO *print_event_info)
#if !defined(MYSQL_CLIENT)
Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid_arg,
- MY_BITMAP const *cols,
bool is_transactional)
- : Rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional, WRITE_ROWS_EVENT_V1)
+ : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set,
+ is_transactional, WRITE_ROWS_EVENT_V1)
{
}
#endif
@@ -11370,7 +11340,16 @@ Rows_log_event::write_row(rpl_group_info *rgi,
the size of the first row and use that value to initialize
storage engine for bulk insertion.
*/
- ulong estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
+ /* this is the first row to be inserted, we estimate the rows with
+ the size of the first row and use that value to initialize
+ storage engine for bulk insertion */
+ DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
+ ulong estimated_rows= 0;
+ if (m_curr_row < m_curr_row_end)
+ estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
+ else if (m_curr_row == m_curr_row_end)
+ estimated_rows= 1;
+
table->file->ha_start_bulk_insert(estimated_rows);
}
@@ -11381,11 +11360,9 @@ Rows_log_event::write_row(rpl_group_info *rgi,
if (is_auto_inc_in_extra_columns())
m_table->next_number_field->set_null();
-#ifndef DBUG_OFF
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
-#endif
if (invoke_triggers &&
process_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE, TRUE))
@@ -11494,11 +11471,9 @@ Rows_log_event::write_row(rpl_group_info *rgi,
error= unpack_current_row(rgi);
}
-#ifndef DBUG_OFF
DBUG_PRINT("debug",("preparing for update: before and after image"));
DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
-#endif
/*
REPLACE is defined as either INSERT or DELETE + INSERT. If
@@ -11858,10 +11833,8 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
prepare_record(table, m_width, FALSE);
error= unpack_current_row(rgi);
-#ifndef DBUG_OFF
DBUG_PRINT("info",("looking for the following record"));
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
-#endif
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
table->s->primary_key < MAX_KEY)
@@ -12125,9 +12098,9 @@ end:
#ifndef MYSQL_CLIENT
Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid, MY_BITMAP const *cols,
- bool is_transactional)
- : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional, DELETE_ROWS_EVENT_V1)
+ ulong tid, bool is_transactional)
+ : Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional,
+ DELETE_ROWS_EVENT_V1)
{
}
#endif /* #if !defined(MYSQL_CLIENT) */
@@ -12255,21 +12228,11 @@ uint8 Delete_rows_log_event::get_trg_event_map()
#if !defined(MYSQL_CLIENT)
Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid,
- MY_BITMAP const *cols_bi,
- MY_BITMAP const *cols_ai,
- bool is_transactional)
-: Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional, UPDATE_ROWS_EVENT_V1)
-{
- init(cols_ai);
-}
-
-Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
- ulong tid,
- MY_BITMAP const *cols,
bool is_transactional)
-: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional, UPDATE_ROWS_EVENT_V1)
+: Rows_log_event(thd_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional,
+ UPDATE_ROWS_EVENT_V1)
{
- init(cols);
+ init(tbl_arg->write_set);
}
void Update_rows_log_event::init(MY_BITMAP const *cols)
@@ -12371,7 +12334,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
able to skip to the next pair of updates
*/
m_curr_row= m_curr_row_end;
- unpack_current_row(rgi);
+ unpack_current_row(rgi, &m_cols_ai);
thd_proc_info(thd, tmp);
return error;
}
@@ -12400,7 +12363,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
/* this also updates m_curr_row_end */
thd_proc_info(thd, message);
- if ((error= unpack_current_row(rgi)))
+ if ((error= unpack_current_row(rgi, &m_cols_ai)))
goto err;
/*
@@ -12433,9 +12396,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
goto err;
}
+ // Temporary fix to find out why it fails [/Matz]
+ memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
+ memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
+
+ m_table->mark_columns_per_binlog_row_image();
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
if (error == HA_ERR_RECORD_IS_THE_SAME)
error= 0;
+ m_table->default_column_bitmaps();
if (invoke_triggers && !error &&
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE))
@@ -12495,7 +12464,12 @@ Incident_log_event::Incident_log_event(const char *buf, uint event_len,
char const *const str_end= buf + event_len;
uint8 len= 0; // Assignment to keep compiler happy
const char *str= NULL; // Assignment to keep compiler happy
- read_str(&ptr, str_end, &str, &len);
+ if (read_str(&ptr, str_end, &str, &len))
+ {
+ /* Mark this event invalid */
+ m_incident= INCIDENT_NONE;
+ DBUG_VOID_RETURN;
+ }
if (!(m_message.str= (char*) my_malloc(len+1, MYF(MY_WME))))
{
/* Mark this event invalid */
@@ -12546,7 +12520,6 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol)
#if WITH_WSREP && !defined(MYSQL_CLIENT)
-Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
/*
read the first event from (*buf). The size of the (*buf) is (*buf_len).
At the end (*buf) is shitfed to point to the following event or NULL and
@@ -12616,42 +12589,34 @@ Incident_log_event::do_apply_event(rpl_group_info *rgi)
}
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT, NULL,
- ER(ER_SLAVE_INCIDENT),
+ ER_THD(rgi->thd, ER_SLAVE_INCIDENT),
description(),
m_message.length > 0 ? m_message.str : "<none>");
DBUG_RETURN(1);
}
#endif
+#ifdef MYSQL_SERVER
bool
-Incident_log_event::write_data_header(IO_CACHE *file)
+Incident_log_event::write_data_header()
{
DBUG_ENTER("Incident_log_event::write_data_header");
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
-#ifndef MYSQL_CLIENT
- DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf)));
-#else
- DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
-#endif
+ DBUG_RETURN(write_data(buf, sizeof(buf)));
}
bool
-Incident_log_event::write_data_body(IO_CACHE *file)
+Incident_log_event::write_data_body()
{
uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body");
tmp[0]= (uchar) m_message.length;
- crc= my_checksum(crc, (uchar*) tmp, 1);
- if (m_message.length > 0)
- {
- crc= my_checksum(crc, (uchar*) m_message.str, m_message.length);
- // todo: report a bug on write_str accepts uint but treats it as uchar
- }
- DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length));
+ DBUG_RETURN(write_data(tmp, sizeof(tmp)) ||
+ write_data(m_message.str, m_message.length));
}
-
+#endif
#ifdef MYSQL_CLIENT
/**