summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc286
1 files changed, 242 insertions, 44 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f6b89cd1a80..947bcbf3509 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3413,9 +3413,23 @@ void free_table_map_log_event(Table_map_log_event *event)
delete event;
}
+/**
+ Encode the event, optionally per 'do_print_encoded' arg store the
+ result into the argument cache; optionally per event_info's
+ 'verbose' print into the cache a verbose representation of the event.
+ Note, no extra wrapping is done to the being io-cached data, like
+ to producing a BINLOG query. It's left for a routine that extracts from
+ the cache.
+
+ @param file pointer to IO_CACHE
+ @param print_event_info pointer to print_event_info specializing
+ what out of and how to print the event
+ @param do_print_encoded whether to store base64-encoded event
+ into @file.
+*/
void Log_event::print_base64(IO_CACHE* file,
PRINT_EVENT_INFO* print_event_info,
- bool more)
+ bool do_print_encoded)
{
uchar *ptr= (uchar *)temp_buf;
uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET);
@@ -3479,17 +3493,9 @@ void Log_event::print_base64(IO_CACHE* file,
DBUG_ASSERT(0);
}
- if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS)
- {
- if (my_b_tell(file) == 0)
- my_b_write_string(file, "\nBINLOG '\n");
-
+ if (do_print_encoded)
my_b_printf(file, "%s\n", tmp_str);
- if (!more)
- my_b_printf(file, "'%s\n", print_event_info->delimiter);
- }
-
#ifdef WHEN_FLASHBACK_REVIEW_READY
if (print_event_info->verbose || need_flashback_review)
#else
@@ -3582,7 +3588,18 @@ void Log_event::print_base64(IO_CACHE* file,
}
#else
if (print_event_info->verbose)
+ {
+ /*
+ Verbose event printout can't start before encoded data
+ got enquoted. This is done at this point though multi-row
+ statement remain vulnerable.
+ TODO: fix MDEV-10362 to remove this workaround.
+ */
+ if (print_event_info->base64_output_mode !=
+ BASE64_OUTPUT_DECODE_ROWS)
+ my_b_printf(file, "'%s\n", print_event_info->delimiter);
ev->print_verbose(file, print_event_info);
+ }
#endif
delete ev;
}
@@ -5234,6 +5251,22 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
else
thd->variables.collation_database= thd->db_charset;
+ {
+ const CHARSET_INFO *cs= thd->charset();
+ /*
+ We cannot ask for parsing a statement using a character set
+ without state_maps (parser internal data).
+ */
+ if (!cs->state_map)
+ {
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
+ "character_set cannot be parsed");
+ thd->is_slave_error= true;
+ goto end;
+ }
+ }
+
/*
Record any GTID in the same transaction, so slave state is
transactionally consistent.
@@ -5653,9 +5686,17 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
!print_event_info->short_form)
{
- if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS)
+ /* BINLOG is matched with the delimiter below on the same level */
+ bool do_print_encoded=
+ print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS;
+ if (do_print_encoded)
my_b_printf(&cache, "BINLOG '\n");
- print_base64(&cache, print_event_info, FALSE);
+
+ print_base64(&cache, print_event_info, do_print_encoded);
+
+ if (do_print_encoded)
+ my_b_printf(&cache, "'%s\n", print_event_info->delimiter);
+
print_event_info->printed_fd_event= TRUE;
}
DBUG_VOID_RETURN;
@@ -8726,12 +8767,6 @@ User_var_log_event(const char* buf, uint event_len,
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE);
- if (val + val_len > buf_end)
- {
- error= true;
- goto err;
- }
-
/**
We need to check if this is from an old server
that did not pack information for flags.
@@ -10446,7 +10481,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
DBUG_VOID_RETURN;
}
size_t const data_size= event_len - read_size;
- DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu",
+ DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu",
m_table_id, m_flags, m_width, (ulong) data_size));
m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
@@ -10659,12 +10694,12 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
/*
- If m_table_id == ~0UL, then we have a dummy event that does not
+ If m_table_id == ~0ULL, then we have a dummy event that does not
contain any data. In that case, we just remove all tables in the
tables_to_lock list, close the thread tables, and return with
success.
*/
- if (m_table_id == ~0UL)
+ if (m_table_id == ~0ULL)
{
/*
This one is supposed to be set: just an extra check so that
@@ -10930,7 +10965,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
table= m_table= rgi->m_table_map.get_table(m_table_id);
- DBUG_PRINT("debug", ("m_table:%p, m_table_id: %lu%s",
+ DBUG_PRINT("debug", ("m_table:%p, m_table_id: %llu%s",
m_table, m_table_id,
table && master_had_triggers ?
" (master had triggers)" : ""));
@@ -11288,14 +11323,14 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
bool Rows_log_event::write_data_header()
{
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
- DBUG_ASSERT(m_table_id != ~0UL);
+ DBUG_ASSERT(m_table_id != ~0ULL);
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
return (write_data(buf, 6));
});
- int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
+ int6store(buf + RW_MAPID_OFFSET, m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
return write_data(buf, ROWS_HEADER_LEN);
}
@@ -11363,12 +11398,159 @@ void Rows_log_event::pack_info(Protocol *protocol)
char const *const flagstr=
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
size_t bytes= my_snprintf(buf, sizeof(buf),
- "table_id: %lu%s", m_table_id, flagstr);
+ "table_id: %llu%s", m_table_id, flagstr);
protocol->store(buf, bytes, &my_charset_bin);
}
#endif
#ifdef MYSQL_CLIENT
+class my_String : public String
+{
+public:
+ my_String() : String(), error(false) {};
+ bool error;
+
+ bool append(const LEX_STRING *ls)
+ {
+ return error= error || String::append(ls);
+ }
+ bool append(IO_CACHE* file, uint32 arg_length)
+ {
+ return error= error || String::append(file, arg_length);
+ }
+};
+
+/**
+ Print an event "body" cache to @c file possibly in two fragments.
+ Each fragement is optionally per @c do_wrap to produce an SQL statement.
+
+ @param file a file to print to
+ @param body the "body" IO_CACHE of event
+ @param do_wrap whether to wrap base64-encoded strings with
+ SQL cover.
+ @param delimiter delimiter string
+
+ The function signals on any error through setting @c body->error to -1.
+*/
+void copy_cache_to_string_wrapped(IO_CACHE *cache,
+ LEX_STRING *to,
+ bool do_wrap,
+ const char *delimiter,
+ bool is_verbose)
+{
+ const char str_binlog[]= "\nBINLOG '\n";
+ const char fmt_delim[]= "'%s\n";
+ const char fmt_n_delim[]= "\n'%s";
+ const char fmt_frag[]= "\nSET @binlog_fragment_%d ='\n";
+ const my_off_t cache_size= my_b_tell(cache);
+ my_String ret;
+ /*
+ substring to hold parts of encoded possibly defragramented event
+ whose size is roughly estimated from the top.
+ */
+ char tmp[sizeof(str_binlog) + 2*(sizeof(fmt_frag) + 2 /* %d */) +
+ sizeof(fmt_delim) + sizeof(fmt_n_delim) +
+ PRINT_EVENT_INFO::max_delimiter_size];
+ LEX_STRING str_tmp= { tmp, 0 };
+
+ if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
+ {
+ cache->error= -1;
+ goto end;
+ }
+
+ if (!do_wrap)
+ {
+ if (ret.append(cache, (uint32) cache->end_of_file))
+ {
+ cache->error= -1;
+ goto end;
+ }
+ }
+ else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) >
+ opt_binlog_rows_event_max_encoded_size)
+ {
+ /*
+ 2 fragments can always represent near 1GB row-based
+ base64-encoded event as two strings each of size less than
+ max(max_allowed_packet). Greater number of fragments does not
+ save from potential need to tweak (increase) @@max_allowed_packet
+ before to process the fragments. So 2 is safe and enough.
+
+ Split the big query when its packet size's estimation exceeds a
+ limit. The estimate includes the maximum packet header
+ contribution of non-compressed packet.
+ */
+ str_tmp.length= sprintf(str_tmp.str, fmt_frag, 0);
+ ret.append(&str_tmp);
+ ret.append(cache, (uint32) cache_size/2 + 1);
+ str_tmp.length= sprintf(str_tmp.str, fmt_n_delim, delimiter);
+ ret.append(&str_tmp);
+
+ str_tmp.length= sprintf(str_tmp.str, fmt_frag, 1);
+ ret.append(&str_tmp);
+ ret.append(cache, uint32(cache->end_of_file - (cache_size/2 + 1)));
+ if (!is_verbose)
+ {
+ str_tmp.length= sprintf(str_tmp.str, fmt_delim, delimiter);
+ ret.append(&str_tmp);
+ }
+ str_tmp.length= sprintf(str_tmp.str, "BINLOG @binlog_fragment_0, @binlog_fragment_1%s\n",
+ delimiter);
+ ret.append(&str_tmp);
+ }
+ else
+ {
+ str_tmp.length= sprintf(str_tmp.str, str_binlog);
+ ret.append(&str_tmp);
+ ret.append(cache, (uint32) cache->end_of_file);
+ if (!is_verbose)
+ {
+ str_tmp.length= sprintf(str_tmp.str, fmt_delim, delimiter);
+ ret.append(&str_tmp);
+ }
+ }
+
+ to->length= ret.length();
+ to->str= ret.release();
+
+ reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE);
+
+ if (ret.error)
+ cache->error= -1;
+end:
+ return;
+}
+
+/**
+ The function invokes base64 encoder to run on the current
+ event string and store the result into two caches.
+ When the event ends the current statement the caches are is copied into
+ the argument file.
+ Copying is also concerned how to wrap the event, specifically to produce
+ a valid SQL syntax.
+ When the encoded data size is within max(MAX_ALLOWED_PACKET)
+ a regular BINLOG query is composed. Otherwise it is build as fragmented
+
+ SET @binlog_fragment_0='...';
+ SET @binlog_fragment_1='...';
+ BINLOG @binlog_fragment_0, @binlog_fragment_1;
+
+ where fragments are represented by a pair of indexed user
+ "one shot" variables.
+
+ @note
+ If any changes made don't forget to duplicate them to
+ Old_rows_log_event as long as it's supported.
+
+ @param file pointer to IO_CACHE
+ @param print_event_info pointer to print_event_info specializing
+ what out of and how to print the event
+ @param name the name of a table that the event operates on
+
+ The function signals on any error of cache access through setting
+ that cache's @c error to -1.
+*/
void Rows_log_event::print_helper(FILE *file,
PRINT_EVENT_INFO *print_event_info,
char const *const name)
@@ -11378,32 +11560,44 @@ void Rows_log_event::print_helper(FILE *file,
#ifdef WHEN_FLASHBACK_REVIEW_READY
IO_CACHE *const sql= &print_event_info->review_sql_cache;
#endif
+ bool do_print_encoded=
+ print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
+ !print_event_info->short_form;
if (!print_event_info->short_form)
{
+
bool const last_stmt_event= get_flags(STMT_END_F);
+ char llbuff[22];
+
print_header(head, print_event_info, !last_stmt_event);
- my_b_printf(head, "\t%s: table id %lu%s\n",
- name, m_table_id,
+ my_b_printf(head, "\t%s: table id %s%s\n",
+ name, ullstr(m_table_id, llbuff),
last_stmt_event ? " flags: STMT_END_F" : "");
- print_base64(body, print_event_info, !last_stmt_event);
+ print_base64(body, print_event_info, do_print_encoded);
}
if (get_flags(STMT_END_F))
{
LEX_STRING tmp_str;
-
- copy_event_cache_to_string_and_reinit(head, &tmp_str);
+#ifdef WHEN_FLASHBACK_REVIEW_READY
+ copy_event_cache_to_string_and_reinit(sql, &tmp_str);
output_buf.append(&tmp_str);
my_free(tmp_str.str);
- copy_event_cache_to_string_and_reinit(body, &tmp_str);
+#endif
+ if (copy_event_cache_to_string_and_reinit(head, &tmp_str))
+ {
+ head->error= -1;
+ return;
+ }
output_buf.append(&tmp_str);
my_free(tmp_str.str);
-#ifdef WHEN_FLASHBACK_REVIEW_READY
- copy_event_cache_to_string_and_reinit(sql, &tmp_str);
+
+ copy_cache_to_string_wrapped(body, &tmp_str, do_print_encoded,
+ print_event_info->delimiter,
+ print_event_info->verbose);
output_buf.append(&tmp_str);
my_free(tmp_str.str);
-#endif
}
}
#endif
@@ -11651,7 +11845,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
uchar cbuf[MAX_INT_WIDTH];
uchar *cbuf_end;
DBUG_ENTER("Table_map_log_event::Table_map_log_event(TABLE)");
- DBUG_ASSERT(m_table_id != ~0UL);
+ DBUG_ASSERT(m_table_id != ~0ULL);
/*
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
table.cc / alloc_table_share():
@@ -11736,7 +11930,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
#endif
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
m_colcnt(0), m_coltype(0),
- m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
+ m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
m_null_bits(0), m_meta_memory(NULL)
{
@@ -11773,7 +11967,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
post_start+= TM_FLAGS_OFFSET;
}
- DBUG_ASSERT(m_table_id != ~0UL);
+ DBUG_ASSERT(m_table_id != ~0ULL);
m_flags= uint2korr(post_start);
@@ -12093,7 +12287,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
table_list->updating= 1;
table_list->required_type= FRMTYPE_TABLE;
- DBUG_PRINT("debug", ("table: %s is mapped to %lu", table_list->table_name,
+ DBUG_PRINT("debug", ("table: %s is mapped to %llu", table_list->table_name,
table_list->table_id));
table_list->master_had_triggers= ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? 1 : 0);
DBUG_PRINT("debug", ("table->master_had_triggers=%d",
@@ -12194,7 +12388,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
#ifndef MYSQL_CLIENT
bool Table_map_log_event::write_data_header()
{
- DBUG_ASSERT(m_table_id != ~0UL);
+ DBUG_ASSERT(m_table_id != ~0ULL);
uchar buf[TABLE_MAP_HEADER_LEN];
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
{
@@ -12202,7 +12396,7 @@ bool Table_map_log_event::write_data_header()
int2store(buf + 4, m_flags);
return (write_data(buf, 6));
});
- int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
+ int6store(buf + TM_MAPID_OFFSET, m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
return write_data(buf, TABLE_MAP_HEADER_LEN);
}
@@ -12252,7 +12446,7 @@ void Table_map_log_event::pack_info(Protocol *protocol)
{
char buf[256];
size_t bytes= my_snprintf(buf, sizeof(buf),
- "table_id: %lu (%s.%s)",
+ "table_id: %llu (%s.%s)",
m_table_id, m_dbnam, m_tblnam);
protocol->store(buf, bytes, &my_charset_bin);
}
@@ -12267,13 +12461,17 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
{
if (!print_event_info->short_form)
{
+ char llbuff[22];
+
print_header(&print_event_info->head_cache, print_event_info, TRUE);
my_b_printf(&print_event_info->head_cache,
- "\tTable_map: %`s.%`s mapped to number %lu%s\n",
- m_dbnam, m_tblnam, m_table_id,
+ "\tTable_map: %`s.%`s mapped to number %s%s\n",
+ m_dbnam, m_tblnam, ullstr(m_table_id, llbuff),
((m_flags & TM_BIT_HAS_TRIGGERS_F) ?
" (has triggers)" : ""));
- print_base64(&print_event_info->body_cache, print_event_info, TRUE);
+ print_base64(&print_event_info->body_cache, print_event_info,
+ print_event_info->base64_output_mode !=
+ BASE64_OUTPUT_DECODE_ROWS);
copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file);
}
}