diff options
Diffstat (limited to 'sql/wsrep_binlog.cc')
-rw-r--r-- | sql/wsrep_binlog.cc | 419 |
1 files changed, 144 insertions, 275 deletions
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc index 85c1deb0d71..787ebc042ae 100644 --- a/sql/wsrep_binlog.cc +++ b/sql/wsrep_binlog.cc @@ -14,12 +14,16 @@ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */ #include "mariadb.h" +#include "mysql/service_wsrep.h" #include "wsrep_binlog.h" #include "wsrep_priv.h" #include "log.h" +#include "slave.h" #include "log_event.h" #include "wsrep_applier.h" +#include "transaction.h" + extern handlerton *binlog_hton; /* Write the contents of a cache to a memory buffer. @@ -40,10 +44,10 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len) DBUG_RETURN(ER_ERROR_ON_WRITE); } - uint length = my_b_bytes_in_cache(cache); - if (unlikely(0 == length)) length = my_b_fill(cache); + uint length= my_b_bytes_in_cache(cache); + if (unlikely(0 == length)) length= my_b_fill(cache); - size_t total_length = 0; + size_t total_length= 0; if (likely(length > 0)) do { @@ -60,7 +64,7 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len) wsrep_max_ws_size, total_length); goto error; } - uchar* tmp = (uchar *)my_realloc(*buf, total_length, + uchar* tmp= (uchar *)my_realloc(*buf, total_length, MYF(MY_ALLOW_ZERO_PTR)); if (!tmp) { @@ -68,17 +72,17 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len) *buf_len, length); goto error; } - *buf = tmp; + *buf= tmp; memcpy(*buf + *buf_len, cache->read_pos, length); - *buf_len = total_length; + *buf_len= total_length; if (cache->file < 0) { cache->read_pos= cache->read_end; break; } - } while ((length = my_b_fill(cache))); + } while ((length= my_b_fill(cache))); if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) { @@ -104,137 +108,6 @@ cleanup: * many transactions would fit in there * so there is no need to reach for the heap */ -/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */ -static inline size_t -heap_size(size_t length) -{ - return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE; -} - -/* append data to writeset */ -static inline wsrep_status_t -wsrep_append_data(wsrep_t* const wsrep, - wsrep_ws_handle_t* const ws, - const void* const data, - size_t const len) -{ - struct wsrep_buf const buff = { data, len }; - wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1, - WSREP_DATA_ORDERED, true)); - DBUG_DUMP("buff", (uchar*) data, len); - if (rc != WSREP_OK) - { - WSREP_WARN("append_data() returned %d", rc); - } - - return rc; -} - -/* - Write the contents of a cache to wsrep provider. - - This function quite the same as MYSQL_BIN_LOG::write_cache(), - with the exception that here we write in buffer instead of log file. - - This version reads all of cache into single buffer and then appends to a - writeset at once. - */ -static int wsrep_write_cache_once(wsrep_t* const wsrep, - THD* const thd, - IO_CACHE* const cache, - size_t* const len) -{ - my_off_t const saved_pos(my_b_tell(cache)); - DBUG_ENTER("wsrep_write_cache_once"); - - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - { - WSREP_ERROR("failed to initialize io-cache"); - DBUG_RETURN(ER_ERROR_ON_WRITE); - } - - int err(WSREP_OK); - - size_t total_length(0); - uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/ - uchar* heap_buf(NULL); - uchar* buf(stack_buf); - size_t allocated(sizeof(stack_buf)); - size_t used(0); - - uint length(my_b_bytes_in_cache(cache)); - if (unlikely(0 == length)) length = my_b_fill(cache); - - if (likely(length > 0)) do - { - total_length += length; - /* - Bail out if buffer grows too large. - A temporary fix to avoid allocating indefinitely large buffer, - not a real limit on a writeset size which includes other things - like header and keys. - */ - if (unlikely(total_length > wsrep_max_ws_size)) - { - WSREP_WARN("transaction size limit (%lu) exceeded: %zu", - wsrep_max_ws_size, total_length); - err = WSREP_TRX_SIZE_EXCEEDED; - goto cleanup; - } - - if (total_length > allocated) - { - size_t const new_size(heap_size(total_length)); - uchar* tmp = (uchar *)my_realloc(heap_buf, new_size, - MYF(MY_ALLOW_ZERO_PTR)); - if (!tmp) - { - WSREP_ERROR("could not (re)allocate buffer: %zu + %u", - allocated, length); - err = WSREP_TRX_SIZE_EXCEEDED; - goto cleanup; - } - - heap_buf = tmp; - buf = heap_buf; - allocated = new_size; - - if (used <= STACK_SIZE && used > 0) // there's data in stack_buf - { - DBUG_ASSERT(buf == stack_buf); - memcpy(heap_buf, stack_buf, used); - } - } - - memcpy(buf + used, cache->read_pos, length); - used = total_length; - if (cache->file < 0) - { - cache->read_pos= cache->read_end; - break; - } - } while ((length = my_b_fill(cache))); - - if (used > 0) - err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used); - - if (WSREP_OK == err) *len = total_length; - -cleanup: - if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) - { - WSREP_ERROR("failed to reinitialize io-cache"); - } - - if (unlikely(WSREP_OK != err)) - { - wsrep_dump_rbr_buf_with_header(thd, buf, used); - } - - my_free(heap_buf); - DBUG_RETURN(err); -} - /* Write the contents of a cache to wsrep provider. @@ -243,62 +116,58 @@ cleanup: This version uses incremental data appending as it reads it from cache. */ -static int wsrep_write_cache_inc(wsrep_t* const wsrep, - THD* const thd, +static int wsrep_write_cache_inc(THD* const thd, IO_CACHE* const cache, size_t* const len) { - my_off_t const saved_pos(my_b_tell(cache)); - DBUG_ENTER("wsrep_write_cache_inc"); - - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - { - WSREP_ERROR("failed to initialize io-cache"); - DBUG_RETURN(WSREP_TRX_ERROR); - } + DBUG_ENTER("wsrep_write_cache_inc"); + my_off_t const saved_pos(my_b_tell(cache)); - int err(WSREP_OK); + if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0)) + { + WSREP_ERROR("failed to initialize io-cache"); + DBUG_RETURN(1);; + } - size_t total_length(0); + int ret= 0; + size_t total_length(0); - uint length(my_b_bytes_in_cache(cache)); - if (unlikely(0 == length)) length = my_b_fill(cache); + uint length(my_b_bytes_in_cache(cache)); + if (unlikely(0 == length)) length= my_b_fill(cache); - if (likely(length > 0)) do + if (likely(length > 0)) + { + do { - total_length += length; - /* bail out if buffer grows too large - not a real limit on a writeset size which includes other things - like header and keys. - */ - if (unlikely(total_length > wsrep_max_ws_size)) - { - WSREP_WARN("transaction size limit (%lu) exceeded: %zu", - wsrep_max_ws_size, total_length); - err = WSREP_TRX_SIZE_EXCEEDED; - goto cleanup; - } - - if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle, - cache->read_pos, length))) - goto cleanup; - - if (cache->file < 0) - { - cache->read_pos= cache->read_end; - break; - } - } while ((length = my_b_fill(cache))); - - if (WSREP_OK == err) *len = total_length; + total_length += length; + /* bail out if buffer grows too large + not a real limit on a writeset size which includes other things + like header and keys. + */ + if (unlikely(total_length > wsrep_max_ws_size)) + { + WSREP_WARN("transaction size limit (%lu) exceeded: %zu", + wsrep_max_ws_size, total_length); + ret= 1; + goto cleanup; + } + if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length))) + goto cleanup; + cache->read_pos= cache->read_end; + } while ((cache->file >= 0) && (length= my_b_fill(cache))); + } + if (ret == 0) + { + assert(total_length + thd->wsrep_sr().log_position() == saved_pos); + } cleanup: - if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) - { - WSREP_ERROR("failed to reinitialize io-cache"); - } - - DBUG_RETURN(err); + *len= total_length; + if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) + { + WSREP_ERROR("failed to reinitialize io-cache"); + } + DBUG_RETURN(ret); } /* @@ -307,17 +176,11 @@ cleanup: This function quite the same as MYSQL_BIN_LOG::write_cache(), with the exception that here we write in buffer instead of log file. */ -int wsrep_write_cache(wsrep_t* const wsrep, - THD* const thd, +int wsrep_write_cache(THD* const thd, IO_CACHE* const cache, size_t* const len) { - if (wsrep_incremental_data_collection) { - return wsrep_write_cache_inc(wsrep, thd, cache, len); - } - else { - return wsrep_write_cache_once(wsrep, thd, cache, len); - } + return wsrep_write_cache_inc(thd, cache, len); } void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len) @@ -365,89 +228,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len) free(filename); } -/* - wsrep exploits binlog's caches even if binlogging itself is not - activated. In such case connection close needs calling - actual binlog's method. - Todo: split binlog hton from its caches to use ones by wsrep - without referring to binlog's stuff. -*/ -int wsrep_binlog_close_connection(THD* thd) -{ - DBUG_ENTER("wsrep_binlog_close_connection"); - if (thd_get_ha_data(thd, binlog_hton) != NULL) - binlog_hton->close_connection (binlog_hton, thd); - DBUG_RETURN(0); -} - -#if 0 -void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache) -{ - char filename[PATH_MAX]= {0}; - int len= snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld.log", - wsrep_data_home_dir, (longlong) thd->thread_id, - (longlong) wsrep_thd_trx_seqno(thd)); - size_t bytes_in_cache = 0; - // check path - if (len >= PATH_MAX) - { - WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); - return ; - } - // init cache - my_off_t const saved_pos(my_b_tell(cache)); - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - { - WSREP_ERROR("failed to initialize io-cache"); - return ; - } - // open file - FILE* of = fopen(filename, "wb"); - if (!of) - { - WSREP_ERROR("Failed to open file '%s': %d (%s)", - filename, errno, strerror(errno)); - goto cleanup; - } - // ready to write - bytes_in_cache= my_b_bytes_in_cache(cache); - if (unlikely(bytes_in_cache == 0)) bytes_in_cache = my_b_fill(cache); - if (likely(bytes_in_cache > 0)) do - { - if (my_fwrite(of, cache->read_pos, bytes_in_cache, - MYF(MY_WME | MY_NABP)) == (size_t) -1) - { - WSREP_ERROR("Failed to write file '%s'", filename); - goto cleanup; - } - - if (cache->file < 0) - { - cache->read_pos= cache->read_end; - break; - } - } while ((bytes_in_cache= my_b_fill(cache))); - if (cache->error == -1) - { - WSREP_ERROR("RBR inconsistent"); - goto cleanup; - } -cleanup: - // init back - if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) - { - WSREP_ERROR("failed to reinitialize io-cache"); - } - // close file - if (of) fclose(of); -} -#endif - -void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) -{ - thd->binlog_flush_pending_rows_event(stmt_end); -} - /* Dump replication buffer along with header to a file. */ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, size_t buf_len) @@ -529,3 +309,92 @@ cleanup1: DBUG_VOID_RETURN; } +int wsrep_write_skip_event(THD* thd) +{ + DBUG_ENTER("wsrep_write_skip_event"); + Ignorable_log_event skip_event(thd); + int ret= mysql_bin_log.write_event(&skip_event); + if (ret) + { + WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret); + } + if (!ret && (ret= trans_commit_stmt(thd))) + { + WSREP_WARN("wsrep_write_skip_event: statt commit failed"); + } + DBUG_RETURN(ret); +} + +int wsrep_write_dummy_event_low(THD *thd, const char *msg) +{ + ::abort(); + return 0; +} + +int wsrep_write_dummy_event(THD *orig_thd, const char *msg) +{ + return 0; +} + +bool wsrep_commit_will_write_binlog(THD *thd) +{ + return (!wsrep_emulate_bin_log && /* binlog enabled*/ + (wsrep_thd_is_local(thd) || /* local thd*/ + (thd->wsrep_applier_service && /* applier and log-slave-updates */ + opt_log_slave_updates))); +} + +/* + The last THD/commit_for_wait registered for group commit. +*/ +static wait_for_commit *commit_order_tail= NULL; + +void wsrep_register_for_group_commit(THD *thd) +{ + DBUG_ENTER("wsrep_register_for_group_commit"); + if (wsrep_emulate_bin_log) + { + /* Binlog is off, no need to maintain group commit queue */ + DBUG_VOID_RETURN; + } + + DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_committing); + + wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc; + + mysql_mutex_lock(&LOCK_wsrep_group_commit); + if (commit_order_tail) + { + wfc->register_wait_for_prior_commit(commit_order_tail); + } + commit_order_tail= thd->wait_for_commit_ptr; + mysql_mutex_unlock(&LOCK_wsrep_group_commit); + + /* + Now we have queued for group commit. If the commit will go + through TC log_and_order(), the commit ordering is done + by TC group commit. Otherwise the wait for prior + commits to complete is done in ha_commit_one_phase(). + */ + DBUG_VOID_RETURN; +} + +void wsrep_unregister_from_group_commit(THD *thd) +{ + DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit); + wait_for_commit *wfc= thd->wait_for_commit_ptr; + + if (wfc) + { + mysql_mutex_lock(&LOCK_wsrep_group_commit); + wfc->unregister_wait_for_prior_commit(); + thd->wakeup_subsequent_commits(0); + + /* The last one queued for group commit has completed commit, it is + safe to set tail to NULL. */ + if (wfc == commit_order_tail) + commit_order_tail= NULL; + mysql_mutex_unlock(&LOCK_wsrep_group_commit); + thd->wait_for_commit_ptr= NULL; + } +} |