summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergei Petrunia <psergey@askmonty.org>2017-09-11 18:15:58 +0300
committerSergei Petrunia <psergey@askmonty.org>2017-09-11 18:15:58 +0300
commit8b1f9f8ee520e5cfb594bdf6e7e9ce7dc5f3c950 (patch)
treeb0e6110e66c51f83ac0f87d95f6642b5eb1410c2
parentc52ffbeba9e90fb5df04de450406734f2abc89be (diff)
downloadmariadb-git-bb-10.2-mdev11934.tar.gz
MDEV-11934: MariaRocks: Group Commit with binlogbb-10.2-mdev11934
Implement MariaDB's Group Commit API. This is a first attempt which lacks the expected performance.
-rw-r--r--storage/rocksdb/ha_rocksdb.cc107
1 files changed, 98 insertions, 9 deletions
diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc
index bf124584759..1ca029dc481 100644
--- a/storage/rocksdb/ha_rocksdb.cc
+++ b/storage/rocksdb/ha_rocksdb.cc
@@ -1683,6 +1683,12 @@ public:
int64_t m_n_mysql_tables_in_use = 0;
/*
+ MariaDB's group commit:
+ */
+ bool commit_ordered_done;
+ bool commit_ordered_res;
+
+ /*
for distinction between rdb_transaction_impl and rdb_writebatch_impl
when using walk tx list
*/
@@ -2299,6 +2305,8 @@ public:
THDVAR(m_thd, write_ignore_missing_column_families);
m_is_two_phase = rocksdb_enable_2pc;
+ commit_ordered_done= false;
+
/*
If m_rocksdb_reuse_tx is null this will create a new transaction object.
Otherwise it will reuse the existing one.
@@ -2511,6 +2519,7 @@ public:
bool is_tx_started() const override { return (m_batch != nullptr); }
void start_tx() override {
+ commit_ordered_done= false; // Do we need this here?
reset();
write_opts.sync = (rocksdb_flush_log_at_trx_commit == 1);
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
@@ -2682,8 +2691,7 @@ static bool rocksdb_flush_wal(handlerton* hton __attribute__((__unused__)))
*/
static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
{
-// This is "ASYNC_COMMIT" feature which is only in webscalesql
- bool async=false;
+ bool async=false; // This is "ASYNC_COMMIT" feature which is only present in webscalesql
Rdb_transaction *&tx = get_tx_from_thd(thd);
if (!tx->can_prepare()) {
@@ -2693,7 +2701,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
(!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
/* We were instructed to prepare the whole transaction, or
this is an SQL statement end and autocommit is on */
-#ifdef MARIAROCKS_NOT_YET // disable prepare/commit
+
+#ifdef MARIAROCKS_NOT_YET // Crash-safe slave does not work yet
std::vector<st_slave_gtid_info> slave_gtid_info;
my_core::thd_slave_gtid_info(thd, &slave_gtid_info);
for (const auto &it : slave_gtid_info) {
@@ -2703,21 +2712,38 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
#endif
if (tx->is_two_phase()) {
+
+ /*
+ MariaDB: the following branch is never taken.
+ We always flush at Prepare and rely on RocksDB's internal Group Commit
+ to do some grouping.
+ */
if (thd->durability_property == HA_IGNORE_DURABILITY || async) {
tx->set_sync(false);
}
+
+ /*
+ MariaDB: do not flush logs if we are running in a non-crash-safe mode.
+ */
+ if (!rocksdb_flush_log_at_trx_commit)
+ tx->set_sync(false);
+
XID xid;
thd_get_xid(thd, reinterpret_cast<MYSQL_XID *>(&xid));
if (!tx->prepare(rdb_xid_to_string(xid))) {
return HA_EXIT_FAILURE;
}
- if (thd->durability_property == HA_IGNORE_DURABILITY )
+
+ /*
+ MariaDB: our Group Commit implementation does not use the
+ hton->flush_logs call (at least currently) so the following is not
+ needed (TODO: will we need this for binlog rotation?)
+ */
#ifdef MARIAROCKS_NOT_YET
+ if (thd->durability_property == HA_IGNORE_DURABILITY
&&
- THDVAR(thd, flush_log_at_trx_commit))
-#endif
+ rocksdb_flush_log_at_trx_commit)
{
-#ifdef MARIAROCKS_NOT_YET
// MariaRocks: disable the
// "write/sync redo log before flushing binlog cache to file"
// feature. See a869c56d361bb44f46c0efeb11a8f03561676247
@@ -2725,8 +2751,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
we set the log sequence as '1' just to trigger hton->flush_logs
*/
thd_store_lsn(thd, 1, DB_TYPE_ROCKSDB);
-#endif
}
+#endif
}
DEBUG_SYNC(thd, "rocksdb.prepared");
@@ -2876,6 +2902,50 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len)
return count;
}
+
+/*
+ Handle a commit checkpoint request from server layer.
+
+ InnoDB does this:
+ We put the request in a queue, so that we can notify upper layer about
+ checkpoint complete when we have flushed the redo log.
+ If we have already flushed all relevant redo log, we notify immediately.
+
+ MariaRocks just flushes everything right away ATM
+*/
+
+static void rocksdb_checkpoint_request(handlerton *hton,
+ void *cookie)
+{
+ const rocksdb::Status s= rdb->SyncWAL();
+ //TODO: what to do on error?
+ if (s.ok())
+ {
+ rocksdb_wal_group_syncs++;
+ commit_checkpoint_notify_ha(hton, cookie);
+ }
+}
+
+/*
+ @param all: TRUE - commit the transaction
+ FALSE - SQL statement ended
+*/
+static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all)
+{
+ // Same assert as InnoDB has
+ DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
+ OPTION_BEGIN)));
+ Rdb_transaction *&tx = get_tx_from_thd(thd);
+
+ tx->set_sync(false);
+
+ /* This will note the master position also */
+ tx->commit_ordered_res= tx->commit();
+ tx->commit_ordered_done= true;
+
+}
+
+
static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
{
DBUG_ENTER_FUNC();
@@ -2896,6 +2966,16 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
if (commit_tx || (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
OPTION_BEGIN))) {
/*
+ This will not add anything to commit_latency_stats, and this is correct
+ right?
+ */
+ if (tx->commit_ordered_done)
+ {
+ thd_wakeup_subsequent_commits(thd, 0);
+ DBUG_RETURN((tx->commit_ordered_res? HA_ERR_INTERNAL_ERROR: 0));
+ }
+
+ /*
We get here
- For a COMMIT statement that finishes a multi-statement transaction
- For a statement that has its own transaction
@@ -2903,6 +2983,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
if (tx->commit()) {
DBUG_RETURN(HA_ERR_ROCKSDB_COMMIT_FAILED);
}
+ thd_wakeup_subsequent_commits(thd, 0);
} else {
/*
We get here when committing a statement within a transaction.
@@ -2926,6 +3007,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
DBUG_RETURN(HA_EXIT_SUCCESS);
}
+
static int rocksdb_rollback(handlerton *const hton, THD *const thd,
bool rollback_tx) {
Rdb_perf_context_guard guard(thd);
@@ -3642,11 +3724,19 @@ static int rocksdb_init_func(void *const p) {
rocksdb_hton->state = SHOW_OPTION_YES;
rocksdb_hton->create = rocksdb_create_handler;
rocksdb_hton->close_connection = rocksdb_close_connection;
+
rocksdb_hton->prepare = rocksdb_prepare;
+ rocksdb_hton->prepare_ordered = NULL; // Do not need it
+
rocksdb_hton->commit_by_xid = rocksdb_commit_by_xid;
rocksdb_hton->rollback_by_xid = rocksdb_rollback_by_xid;
rocksdb_hton->recover = rocksdb_recover;
+
+ rocksdb_hton->commit_ordered= rocksdb_commit_ordered;
rocksdb_hton->commit = rocksdb_commit;
+
+ rocksdb_hton->commit_checkpoint_request= rocksdb_checkpoint_request;
+
rocksdb_hton->rollback = rocksdb_rollback;
rocksdb_hton->show_status = rocksdb_show_status;
rocksdb_hton->start_consistent_snapshot =
@@ -3974,7 +4064,6 @@ static int rocksdb_init_func(void *const p) {
if (myrocks::rocksdb_wal_dir && *myrocks::rocksdb_wal_dir) {
directories.push_back(myrocks::rocksdb_wal_dir);
}
-
#ifndef _WIN32
io_watchdog = new Rdb_io_watchdog(directories);
io_watchdog->reset_timeout(rocksdb_io_write_timeout_secs);