summaryrefslogtreecommitdiff
path: root/sql/wsrep_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_binlog.cc')
-rw-r--r--sql/wsrep_binlog.cc419
1 files changed, 144 insertions, 275 deletions
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
index 85c1deb0d71..787ebc042ae 100644
--- a/sql/wsrep_binlog.cc
+++ b/sql/wsrep_binlog.cc
@@ -14,12 +14,16 @@
51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA. */
#include "mariadb.h"
+#include "mysql/service_wsrep.h"
#include "wsrep_binlog.h"
#include "wsrep_priv.h"
#include "log.h"
+#include "slave.h"
#include "log_event.h"
#include "wsrep_applier.h"
+#include "transaction.h"
+
extern handlerton *binlog_hton;
/*
Write the contents of a cache to a memory buffer.
@@ -40,10 +44,10 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
- uint length = my_b_bytes_in_cache(cache);
- if (unlikely(0 == length)) length = my_b_fill(cache);
+ uint length= my_b_bytes_in_cache(cache);
+ if (unlikely(0 == length)) length= my_b_fill(cache);
- size_t total_length = 0;
+ size_t total_length= 0;
if (likely(length > 0)) do
{
@@ -60,7 +64,7 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
wsrep_max_ws_size, total_length);
goto error;
}
- uchar* tmp = (uchar *)my_realloc(*buf, total_length,
+ uchar* tmp= (uchar *)my_realloc(*buf, total_length,
MYF(MY_ALLOW_ZERO_PTR));
if (!tmp)
{
@@ -68,17 +72,17 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
*buf_len, length);
goto error;
}
- *buf = tmp;
+ *buf= tmp;
memcpy(*buf + *buf_len, cache->read_pos, length);
- *buf_len = total_length;
+ *buf_len= total_length;
if (cache->file < 0)
{
cache->read_pos= cache->read_end;
break;
}
- } while ((length = my_b_fill(cache)));
+ } while ((length= my_b_fill(cache)));
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
{
@@ -104,137 +108,6 @@ cleanup:
* many transactions would fit in there
* so there is no need to reach for the heap */
-/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */
-static inline size_t
-heap_size(size_t length)
-{
- return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE;
-}
-
-/* append data to writeset */
-static inline wsrep_status_t
-wsrep_append_data(wsrep_t* const wsrep,
- wsrep_ws_handle_t* const ws,
- const void* const data,
- size_t const len)
-{
- struct wsrep_buf const buff = { data, len };
- wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1,
- WSREP_DATA_ORDERED, true));
- DBUG_DUMP("buff", (uchar*) data, len);
- if (rc != WSREP_OK)
- {
- WSREP_WARN("append_data() returned %d", rc);
- }
-
- return rc;
-}
-
-/*
- Write the contents of a cache to wsrep provider.
-
- This function quite the same as MYSQL_BIN_LOG::write_cache(),
- with the exception that here we write in buffer instead of log file.
-
- This version reads all of cache into single buffer and then appends to a
- writeset at once.
- */
-static int wsrep_write_cache_once(wsrep_t* const wsrep,
- THD* const thd,
- IO_CACHE* const cache,
- size_t* const len)
-{
- my_off_t const saved_pos(my_b_tell(cache));
- DBUG_ENTER("wsrep_write_cache_once");
-
- if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- {
- WSREP_ERROR("failed to initialize io-cache");
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- }
-
- int err(WSREP_OK);
-
- size_t total_length(0);
- uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/
- uchar* heap_buf(NULL);
- uchar* buf(stack_buf);
- size_t allocated(sizeof(stack_buf));
- size_t used(0);
-
- uint length(my_b_bytes_in_cache(cache));
- if (unlikely(0 == length)) length = my_b_fill(cache);
-
- if (likely(length > 0)) do
- {
- total_length += length;
- /*
- Bail out if buffer grows too large.
- A temporary fix to avoid allocating indefinitely large buffer,
- not a real limit on a writeset size which includes other things
- like header and keys.
- */
- if (unlikely(total_length > wsrep_max_ws_size))
- {
- WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
- wsrep_max_ws_size, total_length);
- err = WSREP_TRX_SIZE_EXCEEDED;
- goto cleanup;
- }
-
- if (total_length > allocated)
- {
- size_t const new_size(heap_size(total_length));
- uchar* tmp = (uchar *)my_realloc(heap_buf, new_size,
- MYF(MY_ALLOW_ZERO_PTR));
- if (!tmp)
- {
- WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
- allocated, length);
- err = WSREP_TRX_SIZE_EXCEEDED;
- goto cleanup;
- }
-
- heap_buf = tmp;
- buf = heap_buf;
- allocated = new_size;
-
- if (used <= STACK_SIZE && used > 0) // there's data in stack_buf
- {
- DBUG_ASSERT(buf == stack_buf);
- memcpy(heap_buf, stack_buf, used);
- }
- }
-
- memcpy(buf + used, cache->read_pos, length);
- used = total_length;
- if (cache->file < 0)
- {
- cache->read_pos= cache->read_end;
- break;
- }
- } while ((length = my_b_fill(cache)));
-
- if (used > 0)
- err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used);
-
- if (WSREP_OK == err) *len = total_length;
-
-cleanup:
- if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
- {
- WSREP_ERROR("failed to reinitialize io-cache");
- }
-
- if (unlikely(WSREP_OK != err))
- {
- wsrep_dump_rbr_buf_with_header(thd, buf, used);
- }
-
- my_free(heap_buf);
- DBUG_RETURN(err);
-}
-
/*
Write the contents of a cache to wsrep provider.
@@ -243,62 +116,58 @@ cleanup:
This version uses incremental data appending as it reads it from cache.
*/
-static int wsrep_write_cache_inc(wsrep_t* const wsrep,
- THD* const thd,
+static int wsrep_write_cache_inc(THD* const thd,
IO_CACHE* const cache,
size_t* const len)
{
- my_off_t const saved_pos(my_b_tell(cache));
- DBUG_ENTER("wsrep_write_cache_inc");
-
- if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- {
- WSREP_ERROR("failed to initialize io-cache");
- DBUG_RETURN(WSREP_TRX_ERROR);
- }
+ DBUG_ENTER("wsrep_write_cache_inc");
+ my_off_t const saved_pos(my_b_tell(cache));
- int err(WSREP_OK);
+ if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ DBUG_RETURN(1);;
+ }
- size_t total_length(0);
+ int ret= 0;
+ size_t total_length(0);
- uint length(my_b_bytes_in_cache(cache));
- if (unlikely(0 == length)) length = my_b_fill(cache);
+ uint length(my_b_bytes_in_cache(cache));
+ if (unlikely(0 == length)) length= my_b_fill(cache);
- if (likely(length > 0)) do
+ if (likely(length > 0))
+ {
+ do
{
- total_length += length;
- /* bail out if buffer grows too large
- not a real limit on a writeset size which includes other things
- like header and keys.
- */
- if (unlikely(total_length > wsrep_max_ws_size))
- {
- WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
- wsrep_max_ws_size, total_length);
- err = WSREP_TRX_SIZE_EXCEEDED;
- goto cleanup;
- }
-
- if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle,
- cache->read_pos, length)))
- goto cleanup;
-
- if (cache->file < 0)
- {
- cache->read_pos= cache->read_end;
- break;
- }
- } while ((length = my_b_fill(cache)));
-
- if (WSREP_OK == err) *len = total_length;
+ total_length += length;
+ /* bail out if buffer grows too large
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (unlikely(total_length > wsrep_max_ws_size))
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ ret= 1;
+ goto cleanup;
+ }
+ if (thd->wsrep_cs().append_data(wsrep::const_buffer(cache->read_pos, length)))
+ goto cleanup;
+ cache->read_pos= cache->read_end;
+ } while ((cache->file >= 0) && (length= my_b_fill(cache)));
+ }
+ if (ret == 0)
+ {
+ assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
+ }
cleanup:
- if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
- {
- WSREP_ERROR("failed to reinitialize io-cache");
- }
-
- DBUG_RETURN(err);
+ *len= total_length;
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+ DBUG_RETURN(ret);
}
/*
@@ -307,17 +176,11 @@ cleanup:
This function quite the same as MYSQL_BIN_LOG::write_cache(),
with the exception that here we write in buffer instead of log file.
*/
-int wsrep_write_cache(wsrep_t* const wsrep,
- THD* const thd,
+int wsrep_write_cache(THD* const thd,
IO_CACHE* const cache,
size_t* const len)
{
- if (wsrep_incremental_data_collection) {
- return wsrep_write_cache_inc(wsrep, thd, cache, len);
- }
- else {
- return wsrep_write_cache_once(wsrep, thd, cache, len);
- }
+ return wsrep_write_cache_inc(thd, cache, len);
}
void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
@@ -365,89 +228,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
free(filename);
}
-/*
- wsrep exploits binlog's caches even if binlogging itself is not
- activated. In such case connection close needs calling
- actual binlog's method.
- Todo: split binlog hton from its caches to use ones by wsrep
- without referring to binlog's stuff.
-*/
-int wsrep_binlog_close_connection(THD* thd)
-{
- DBUG_ENTER("wsrep_binlog_close_connection");
- if (thd_get_ha_data(thd, binlog_hton) != NULL)
- binlog_hton->close_connection (binlog_hton, thd);
- DBUG_RETURN(0);
-}
-
-#if 0
-void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
-{
- char filename[PATH_MAX]= {0};
- int len= snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld.log",
- wsrep_data_home_dir, (longlong) thd->thread_id,
- (longlong) wsrep_thd_trx_seqno(thd));
- size_t bytes_in_cache = 0;
- // check path
- if (len >= PATH_MAX)
- {
- WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
- return ;
- }
- // init cache
- my_off_t const saved_pos(my_b_tell(cache));
- if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- {
- WSREP_ERROR("failed to initialize io-cache");
- return ;
- }
- // open file
- FILE* of = fopen(filename, "wb");
- if (!of)
- {
- WSREP_ERROR("Failed to open file '%s': %d (%s)",
- filename, errno, strerror(errno));
- goto cleanup;
- }
- // ready to write
- bytes_in_cache= my_b_bytes_in_cache(cache);
- if (unlikely(bytes_in_cache == 0)) bytes_in_cache = my_b_fill(cache);
- if (likely(bytes_in_cache > 0)) do
- {
- if (my_fwrite(of, cache->read_pos, bytes_in_cache,
- MYF(MY_WME | MY_NABP)) == (size_t) -1)
- {
- WSREP_ERROR("Failed to write file '%s'", filename);
- goto cleanup;
- }
-
- if (cache->file < 0)
- {
- cache->read_pos= cache->read_end;
- break;
- }
- } while ((bytes_in_cache= my_b_fill(cache)));
- if (cache->error == -1)
- {
- WSREP_ERROR("RBR inconsistent");
- goto cleanup;
- }
-cleanup:
- // init back
- if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
- {
- WSREP_ERROR("failed to reinitialize io-cache");
- }
- // close file
- if (of) fclose(of);
-}
-#endif
-
-void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
-{
- thd->binlog_flush_pending_rows_event(stmt_end);
-}
-
/* Dump replication buffer along with header to a file. */
void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
size_t buf_len)
@@ -529,3 +309,92 @@ cleanup1:
DBUG_VOID_RETURN;
}
+int wsrep_write_skip_event(THD* thd)
+{
+ DBUG_ENTER("wsrep_write_skip_event");
+ Ignorable_log_event skip_event(thd);
+ int ret= mysql_bin_log.write_event(&skip_event);
+ if (ret)
+ {
+ WSREP_WARN("wsrep_write_skip_event: write to binlog failed: %d", ret);
+ }
+ if (!ret && (ret= trans_commit_stmt(thd)))
+ {
+ WSREP_WARN("wsrep_write_skip_event: statt commit failed");
+ }
+ DBUG_RETURN(ret);
+}
+
+int wsrep_write_dummy_event_low(THD *thd, const char *msg)
+{
+ ::abort();
+ return 0;
+}
+
+int wsrep_write_dummy_event(THD *orig_thd, const char *msg)
+{
+ return 0;
+}
+
+bool wsrep_commit_will_write_binlog(THD *thd)
+{
+ return (!wsrep_emulate_bin_log && /* binlog enabled*/
+ (wsrep_thd_is_local(thd) || /* local thd*/
+ (thd->wsrep_applier_service && /* applier and log-slave-updates */
+ opt_log_slave_updates)));
+}
+
+/*
+ The last THD/commit_for_wait registered for group commit.
+*/
+static wait_for_commit *commit_order_tail= NULL;
+
+void wsrep_register_for_group_commit(THD *thd)
+{
+ DBUG_ENTER("wsrep_register_for_group_commit");
+ if (wsrep_emulate_bin_log)
+ {
+ /* Binlog is off, no need to maintain group commit queue */
+ DBUG_VOID_RETURN;
+ }
+
+ DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_committing);
+
+ wait_for_commit *wfc= thd->wait_for_commit_ptr= &thd->wsrep_wfc;
+
+ mysql_mutex_lock(&LOCK_wsrep_group_commit);
+ if (commit_order_tail)
+ {
+ wfc->register_wait_for_prior_commit(commit_order_tail);
+ }
+ commit_order_tail= thd->wait_for_commit_ptr;
+ mysql_mutex_unlock(&LOCK_wsrep_group_commit);
+
+ /*
+ Now we have queued for group commit. If the commit will go
+ through TC log_and_order(), the commit ordering is done
+ by TC group commit. Otherwise the wait for prior
+ commits to complete is done in ha_commit_one_phase().
+ */
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_unregister_from_group_commit(THD *thd)
+{
+ DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit);
+ wait_for_commit *wfc= thd->wait_for_commit_ptr;
+
+ if (wfc)
+ {
+ mysql_mutex_lock(&LOCK_wsrep_group_commit);
+ wfc->unregister_wait_for_prior_commit();
+ thd->wakeup_subsequent_commits(0);
+
+ /* The last one queued for group commit has completed commit, it is
+ safe to set tail to NULL. */
+ if (wfc == commit_order_tail)
+ commit_order_tail= NULL;
+ mysql_mutex_unlock(&LOCK_wsrep_group_commit);
+ thd->wait_for_commit_ptr= NULL;
+ }
+}