diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 286 |
1 files changed, 242 insertions, 44 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index f6b89cd1a80..947bcbf3509 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3413,9 +3413,23 @@ void free_table_map_log_event(Table_map_log_event *event) delete event; } +/** + Encode the event, optionally per 'do_print_encoded' arg store the + result into the argument cache; optionally per event_info's + 'verbose' print into the cache a verbose representation of the event. + Note, no extra wrapping is done to the being io-cached data, like + to producing a BINLOG query. It's left for a routine that extracts from + the cache. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param do_print_encoded whether to store base64-encoded event + into @file. +*/ void Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool more) + bool do_print_encoded) { uchar *ptr= (uchar *)temp_buf; uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET); @@ -3479,17 +3493,9 @@ void Log_event::print_base64(IO_CACHE* file, DBUG_ASSERT(0); } - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) - { - if (my_b_tell(file) == 0) - my_b_write_string(file, "\nBINLOG '\n"); - + if (do_print_encoded) my_b_printf(file, "%s\n", tmp_str); - if (!more) - my_b_printf(file, "'%s\n", print_event_info->delimiter); - } - #ifdef WHEN_FLASHBACK_REVIEW_READY if (print_event_info->verbose || need_flashback_review) #else @@ -3582,7 +3588,18 @@ void Log_event::print_base64(IO_CACHE* file, } #else if (print_event_info->verbose) + { + /* + Verbose event printout can't start before encoded data + got enquoted. This is done at this point though multi-row + statement remain vulnerable. + TODO: fix MDEV-10362 to remove this workaround. + */ + if (print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS) + my_b_printf(file, "'%s\n", print_event_info->delimiter); ev->print_verbose(file, print_event_info); + } #endif delete ev; } @@ -5234,6 +5251,22 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, else thd->variables.collation_database= thd->db_charset; + { + const CHARSET_INFO *cs= thd->charset(); + /* + We cannot ask for parsing a statement using a character set + without state_maps (parser internal data). + */ + if (!cs->state_map) + { + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "character_set cannot be parsed"); + thd->is_slave_error= true; + goto end; + } + } + /* Record any GTID in the same transaction, so slave state is transactionally consistent. @@ -5653,9 +5686,17 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && !print_event_info->short_form) { - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) + /* BINLOG is matched with the delimiter below on the same level */ + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS; + if (do_print_encoded) my_b_printf(&cache, "BINLOG '\n"); - print_base64(&cache, print_event_info, FALSE); + + print_base64(&cache, print_event_info, do_print_encoded); + + if (do_print_encoded) + my_b_printf(&cache, "'%s\n", print_event_info->delimiter); + print_event_info->printed_fd_event= TRUE; } DBUG_VOID_RETURN; @@ -8726,12 +8767,6 @@ User_var_log_event(const char* buf, uint event_len, val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE); - if (val + val_len > buf_end) - { - error= true; - goto err; - } - /** We need to check if this is from an old server that did not pack information for flags. @@ -10446,7 +10481,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, DBUG_VOID_RETURN; } size_t const data_size= event_len - read_size; - DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu", + DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu", m_table_id, m_flags, m_width, (ulong) data_size)); m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME)); @@ -10659,12 +10694,12 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; /* - If m_table_id == ~0UL, then we have a dummy event that does not + If m_table_id == ~0ULL, then we have a dummy event that does not contain any data. In that case, we just remove all tables in the tables_to_lock list, close the thread tables, and return with success. */ - if (m_table_id == ~0UL) + if (m_table_id == ~0ULL) { /* This one is supposed to be set: just an extra check so that @@ -10930,7 +10965,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) table= m_table= rgi->m_table_map.get_table(m_table_id); - DBUG_PRINT("debug", ("m_table:%p, m_table_id: %lu%s", + DBUG_PRINT("debug", ("m_table:%p, m_table_id: %llu%s", m_table, m_table_id, table && master_had_triggers ? " (master had triggers)" : "")); @@ -11288,14 +11323,14 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) bool Rows_log_event::write_data_header() { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); return (write_data(buf, 6)); }); - int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); + int6store(buf + RW_MAPID_OFFSET, m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); return write_data(buf, ROWS_HEADER_LEN); } @@ -11363,12 +11398,159 @@ void Rows_log_event::pack_info(Protocol *protocol) char const *const flagstr= get_flags(STMT_END_F) ? " flags: STMT_END_F" : ""; size_t bytes= my_snprintf(buf, sizeof(buf), - "table_id: %lu%s", m_table_id, flagstr); + "table_id: %llu%s", m_table_id, flagstr); protocol->store(buf, bytes, &my_charset_bin); } #endif #ifdef MYSQL_CLIENT +class my_String : public String +{ +public: + my_String() : String(), error(false) {}; + bool error; + + bool append(const LEX_STRING *ls) + { + return error= error || String::append(ls); + } + bool append(IO_CACHE* file, uint32 arg_length) + { + return error= error || String::append(file, arg_length); + } +}; + +/** + Print an event "body" cache to @c file possibly in two fragments. + Each fragement is optionally per @c do_wrap to produce an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + @param delimiter delimiter string + + The function signals on any error through setting @c body->error to -1. +*/ +void copy_cache_to_string_wrapped(IO_CACHE *cache, + LEX_STRING *to, + bool do_wrap, + const char *delimiter, + bool is_verbose) +{ + const char str_binlog[]= "\nBINLOG '\n"; + const char fmt_delim[]= "'%s\n"; + const char fmt_n_delim[]= "\n'%s"; + const char fmt_frag[]= "\nSET @binlog_fragment_%d ='\n"; + const my_off_t cache_size= my_b_tell(cache); + my_String ret; + /* + substring to hold parts of encoded possibly defragramented event + whose size is roughly estimated from the top. + */ + char tmp[sizeof(str_binlog) + 2*(sizeof(fmt_frag) + 2 /* %d */) + + sizeof(fmt_delim) + sizeof(fmt_n_delim) + + PRINT_EVENT_INFO::max_delimiter_size]; + LEX_STRING str_tmp= { tmp, 0 }; + + if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + { + cache->error= -1; + goto end; + } + + if (!do_wrap) + { + if (ret.append(cache, (uint32) cache->end_of_file)) + { + cache->error= -1; + goto end; + } + } + else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) > + opt_binlog_rows_event_max_encoded_size) + { + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + + Split the big query when its packet size's estimation exceeds a + limit. The estimate includes the maximum packet header + contribution of non-compressed packet. + */ + str_tmp.length= sprintf(str_tmp.str, fmt_frag, 0); + ret.append(&str_tmp); + ret.append(cache, (uint32) cache_size/2 + 1); + str_tmp.length= sprintf(str_tmp.str, fmt_n_delim, delimiter); + ret.append(&str_tmp); + + str_tmp.length= sprintf(str_tmp.str, fmt_frag, 1); + ret.append(&str_tmp); + ret.append(cache, uint32(cache->end_of_file - (cache_size/2 + 1))); + if (!is_verbose) + { + str_tmp.length= sprintf(str_tmp.str, fmt_delim, delimiter); + ret.append(&str_tmp); + } + str_tmp.length= sprintf(str_tmp.str, "BINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", + delimiter); + ret.append(&str_tmp); + } + else + { + str_tmp.length= sprintf(str_tmp.str, str_binlog); + ret.append(&str_tmp); + ret.append(cache, (uint32) cache->end_of_file); + if (!is_verbose) + { + str_tmp.length= sprintf(str_tmp.str, fmt_delim, delimiter); + ret.append(&str_tmp); + } + } + + to->length= ret.length(); + to->str= ret.release(); + + reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); + + if (ret.error) + cache->error= -1; +end: + return; +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG @binlog_fragment_0, @binlog_fragment_1; + + where fragments are represented by a pair of indexed user + "one shot" variables. + + @note + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) @@ -11378,32 +11560,44 @@ void Rows_log_event::print_helper(FILE *file, #ifdef WHEN_FLASHBACK_REVIEW_READY IO_CACHE *const sql= &print_event_info->review_sql_cache; #endif + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; if (!print_event_info->short_form) { + bool const last_stmt_event= get_flags(STMT_END_F); + char llbuff[22]; + print_header(head, print_event_info, !last_stmt_event); - my_b_printf(head, "\t%s: table id %lu%s\n", - name, m_table_id, + my_b_printf(head, "\t%s: table id %s%s\n", + name, ullstr(m_table_id, llbuff), last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { LEX_STRING tmp_str; - - copy_event_cache_to_string_and_reinit(head, &tmp_str); +#ifdef WHEN_FLASHBACK_REVIEW_READY + copy_event_cache_to_string_and_reinit(sql, &tmp_str); output_buf.append(&tmp_str); my_free(tmp_str.str); - copy_event_cache_to_string_and_reinit(body, &tmp_str); +#endif + if (copy_event_cache_to_string_and_reinit(head, &tmp_str)) + { + head->error= -1; + return; + } output_buf.append(&tmp_str); my_free(tmp_str.str); -#ifdef WHEN_FLASHBACK_REVIEW_READY - copy_event_cache_to_string_and_reinit(sql, &tmp_str); + + copy_cache_to_string_wrapped(body, &tmp_str, do_print_encoded, + print_event_info->delimiter, + print_event_info->verbose); output_buf.append(&tmp_str); my_free(tmp_str.str); -#endif } } #endif @@ -11651,7 +11845,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, uchar cbuf[MAX_INT_WIDTH]; uchar *cbuf_end; DBUG_ENTER("Table_map_log_event::Table_map_log_event(TABLE)"); - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); /* In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in table.cc / alloc_table_share(): @@ -11736,7 +11930,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, #endif m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0), m_colcnt(0), m_coltype(0), - m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0), + m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0), m_data_size(0), m_field_metadata(0), m_field_metadata_size(0), m_null_bits(0), m_meta_memory(NULL) { @@ -11773,7 +11967,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, post_start+= TM_FLAGS_OFFSET; } - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); m_flags= uint2korr(post_start); @@ -12093,7 +12287,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) table_list->updating= 1; table_list->required_type= FRMTYPE_TABLE; - DBUG_PRINT("debug", ("table: %s is mapped to %lu", table_list->table_name, + DBUG_PRINT("debug", ("table: %s is mapped to %llu", table_list->table_name, table_list->table_id)); table_list->master_had_triggers= ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? 1 : 0); DBUG_PRINT("debug", ("table->master_had_triggers=%d", @@ -12194,7 +12388,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) #ifndef MYSQL_CLIENT bool Table_map_log_event::write_data_header() { - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); uchar buf[TABLE_MAP_HEADER_LEN]; DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", { @@ -12202,7 +12396,7 @@ bool Table_map_log_event::write_data_header() int2store(buf + 4, m_flags); return (write_data(buf, 6)); }); - int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); + int6store(buf + TM_MAPID_OFFSET, m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); return write_data(buf, TABLE_MAP_HEADER_LEN); } @@ -12252,7 +12446,7 @@ void Table_map_log_event::pack_info(Protocol *protocol) { char buf[256]; size_t bytes= my_snprintf(buf, sizeof(buf), - "table_id: %lu (%s.%s)", + "table_id: %llu (%s.%s)", m_table_id, m_dbnam, m_tblnam); protocol->store(buf, bytes, &my_charset_bin); } @@ -12267,13 +12461,17 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) { if (!print_event_info->short_form) { + char llbuff[22]; + print_header(&print_event_info->head_cache, print_event_info, TRUE); my_b_printf(&print_event_info->head_cache, - "\tTable_map: %`s.%`s mapped to number %lu%s\n", - m_dbnam, m_tblnam, m_table_id, + "\tTable_map: %`s.%`s mapped to number %s%s\n", + m_dbnam, m_tblnam, ullstr(m_table_id, llbuff), ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? " (has triggers)" : "")); - print_base64(&print_event_info->body_cache, print_event_info, TRUE); + print_base64(&print_event_info->body_cache, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file); } } |