diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/item_func.cc | 2 | ||||
-rw-r--r-- | sql/item_func.h | 4 | ||||
-rw-r--r-- | sql/log_event.cc | 169 | ||||
-rw-r--r-- | sql/log_event.h | 13 | ||||
-rw-r--r-- | sql/log_event_old.cc | 16 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 94 | ||||
-rw-r--r-- | sql/sql_lex.h | 4 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 11 |
8 files changed, 277 insertions, 36 deletions
diff --git a/sql/item_func.cc b/sql/item_func.cc index 8b1c7b3bc61..169eb76d802 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -4828,7 +4828,7 @@ bool Item_func_set_user_var::register_field_in_bitmap(uchar *arg) true failure */ -static bool +bool update_hash(user_var_entry *entry, bool set_null, void *ptr, uint length, Item_result type, CHARSET_INFO *cs, bool unsigned_arg) diff --git a/sql/item_func.h b/sql/item_func.h index 2c0e3a62f6a..e3eab02f213 100644 --- a/sql/item_func.h +++ b/sql/item_func.h @@ -2283,4 +2283,8 @@ bool eval_const_cond(COND *cond); extern bool volatile mqh_used; +bool update_hash(user_var_entry *entry, bool set_null, void *ptr, uint length, + Item_result type, CHARSET_INFO *cs, + bool unsigned_arg); + #endif /* ITEM_FUNC_INCLUDED */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 883f1863ac4..4b3bad1fbc7 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2783,9 +2783,23 @@ void free_table_map_log_event(Table_map_log_event *event) delete event; } +/** + Encode the event, optionally per 'do_print_encoded' arg store the + result into the argument cache; optionally per event_info's + 'verbose' print into the cache a verbose representation of the event. + Note, no extra wrapping is done to the being io-cached data, like + to producing a BINLOG query. It's left for a routine that extracts from + the cache. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param do_print_encoded whether to store base64-encoded event + into @file. +*/ void Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool more) + bool do_print_encoded) { const uchar *ptr= (const uchar *)temp_buf; uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET); @@ -2804,17 +2818,9 @@ void Log_event::print_base64(IO_CACHE* file, DBUG_ASSERT(0); } - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) - { - if (my_b_tell(file) == 0) - my_b_write_string(file, "\nBINLOG '\n"); - + if (do_print_encoded) my_b_printf(file, "%s\n", tmp_str); - if (!more) - my_b_printf(file, "'%s\n", print_event_info->delimiter); - } - if (print_event_info->verbose) { Rows_log_event *ev= NULL; @@ -4851,9 +4857,17 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && !print_event_info->short_form) { - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) + /* BINLOG is matched with the delimiter below on the same level */ + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS; + if (do_print_encoded) my_b_printf(&cache, "BINLOG '\n"); - print_base64(&cache, print_event_info, FALSE); + + print_base64(&cache, print_event_info, do_print_encoded); + + if (do_print_encoded) + my_b_printf(&cache, "'%s\n", print_event_info->delimiter); + print_event_info->printed_fd_event= TRUE; } DBUG_VOID_RETURN; @@ -10491,12 +10505,128 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif #ifdef MYSQL_CLIENT +/** + Print an event "body" cache to @c file possibly in two fragments. + Each fragement is optionally per @c do_wrap to produce an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + @param delimiter delimiter string + + The function signals on any error through setting @c body->error to -1. +*/ +void copy_cache_to_file_wrapped(FILE *file, + IO_CACHE *body, + bool do_wrap, + const char *delimiter) +{ + const char str_binlog[]= "\nBINLOG '\n"; + const char fmt_delim[]= "'%s\n"; + const char fmt_n_delim[]= "\n'%s\n"; + const my_off_t cache_size= my_b_tell(body); + + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) + { + body->error= -1; + goto end; + } + + if (!do_wrap) + { + my_b_copy_to_file(body, file, SIZE_T_MAX); + } + else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) > + opt_binlog_rows_event_max_encoded_size) + { + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + + Split the big query when its packet size's estimation exceeds a + limit. The estimate includes the maximum packet header + contribution of non-compressed packet. + */ + const char fmt_frag[]= "%sSET @binlog_fragment_%d ='\n"; + + my_fprintf(file, fmt_frag, "\n", 0); + if (my_b_copy_to_file(body, file, cache_size/2 + 1)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_n_delim, delimiter); + + my_fprintf(file, fmt_frag, "", 1); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_delim, delimiter); + + my_fprintf(file, "BINLOG @binlog_fragment_%d, @binlog_fragment_%d%s\n", + 0, 1, delimiter); + } + else + { + my_fprintf(file, str_binlog); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_delim, delimiter); + } + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + +end: + return; +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG @binlog_fragment_0, @binlog_fragment_1; + + where fragments are represented by a pair of indexed user + "one shot" variables. + + NOTE. + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -10504,13 +10634,18 @@ void Rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + copy_cache_to_file_wrapped(file, body, do_print_encoded, + print_event_info->delimiter); } } #endif @@ -11379,7 +11514,9 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) m_dbnam, m_tblnam, m_table_id, ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? " (has triggers)" : "")); - print_base64(&print_event_info->body_cache, print_event_info, TRUE); + print_base64(&print_event_info->body_cache, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file); } } diff --git a/sql/log_event.h b/sql/log_event.h index 90900f63533..446bd8cb827 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1157,7 +1157,7 @@ public: void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, bool is_more); void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool is_more); + bool do_print_encoded); #endif /* read_log_event() functions read an event from a binlog or relay @@ -4891,15 +4891,22 @@ public: virtual int get_data_size() { return IGNORABLE_HEADER_LEN; } }; +#ifdef MYSQL_CLIENT +void copy_cache_to_file_wrapped(FILE *file, + PRINT_EVENT_INFO *print_event_info, + IO_CACHE *body, + bool do_wrap); +#endif static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file) { - return - my_b_copy_to_file(cache, file) || + return + my_b_copy_all_to_file(cache, file) || reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); } + #ifdef MYSQL_SERVER /***************************************************************************** diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index d2b4470bbf9..a6f2ed3f416 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1850,12 +1850,17 @@ void Old_rows_log_event::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT +/* Method duplicates Rows_log_event's one */ void Old_rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -1863,13 +1868,18 @@ void Old_rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + copy_cache_to_file_wrapped(file, body, do_print_encoded, + print_event_info->delimiter); } } #endif diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..2e861d00f10 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,65 @@ // START_EVENT_V3, // Log_event_type, // Log_event + +/** + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + @param thd THD handle + @return + 0 at success, + -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + user_var_entry *entry[2]; + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; + + /* compute the total size */ + thd->lex->comment.str= NULL; + thd->lex->comment.length= 0; + for (uint k= 0; k < 2; k++) + { + entry[k]= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, + name[k].length); + if (!entry[k] || entry[k]->type != STRING_RESULT) + { + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); + return -1; + } + thd->lex->comment.length += entry[k]->length; + } + + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + return -1; + } + + /* fragments are merged into allocated buf while the user var:s get reset */ + size_t gathered_length= 0; + for (uint k=0; k < 2; k++) + { + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, + entry[k]->length); + gathered_length += entry[k]->length; + update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0); + } + + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + + return 0; +} + + /** Execute a BINLOG statement. @@ -53,14 +112,6 @@ void mysql_client_binlog_statement(THD* thd) if (check_global_access(thd, SUPER_ACL)) DBUG_VOID_RETURN; - size_t coded_len= thd->lex->comment.length; - if (!coded_len) - { - my_error(ER_SYNTAX_ERROR, MYF(0)); - DBUG_VOID_RETURN; - } - size_t decoded_len= base64_needed_decoded_length(coded_len); - /* option_bits will be changed when applying the event. But we don't expect it be changed permanently after BINLOG statement, so backup it first. @@ -81,6 +132,8 @@ void mysql_client_binlog_statement(THD* thd) int err; Relay_log_info *rli; rpl_group_info *rgi; + char *buf= NULL; + size_t coded_len= 0, decoded_len= 0; rli= thd->rli_fake; if (!rli) @@ -102,15 +155,13 @@ void mysql_client_binlog_statement(THD* thd) rgi->thd= thd; const char *error= 0; - char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; + my_bool is_fragmented= FALSE; /* Out of memory check */ - if (!(rli && - rli->relay_log.description_event_for_exec && - buf)) + if (!(rli && rli->relay_log.description_event_for_exec)) { my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */ goto end; @@ -119,6 +170,23 @@ void mysql_client_binlog_statement(THD* thd) rli->sql_driver_thd= thd; rli->no_storage= TRUE; + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) + if (binlog_defragment(thd)) + goto end; + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + decoded_len= base64_needed_decoded_length(coded_len); + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } + for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { @@ -272,6 +340,8 @@ void mysql_client_binlog_statement(THD* thd) my_ok(thd); end: + if (unlikely(is_fragmented)) + my_free(thd->lex->comment.str); thd->variables.option_bits= thd_options; rgi->slave_close_thread_tables(thd); my_free(buf); diff --git a/sql/sql_lex.h b/sql/sql_lex.h index f1edc809579..edc647522d3 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -2459,6 +2459,10 @@ struct LEX: public Query_tables_list String *wild; /* Wildcard in SHOW {something} LIKE 'wild'*/ sql_exchange *exchange; select_result *result; + /** + @c the two may also hold BINLOG arguments: either comment holds a + base64-char string or both represent the BINLOG fragment user variables. + */ LEX_STRING comment, ident; LEX_USER *grant_user; XID *xid; diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 0b85b597baf..bfaa0a60a24 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -7950,8 +7950,17 @@ binlog_base64_event: { Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; Lex->comment= $2; + Lex->ident.str= NULL; + Lex->ident.length= 0; } - ; + | + BINLOG_SYM '@' ident_or_text ',' '@' ident_or_text + { + Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; + Lex->comment= $3; + Lex->ident= $6; + } + ; check_view_or_table: table_or_tables table_list opt_mi_check_type |