summaryrefslogtreecommitdiff
path: root/storage/rocksdb/rdb_index_merge.cc
diff options
context:
space:
mode:
authorSergei Petrunia <psergey@askmonty.org>2016-10-06 17:24:09 +0000
committerSergei Petrunia <psergey@askmonty.org>2016-10-06 17:24:09 +0000
commitebfc4e6ad02b0cef34ec3f446007b98d85af9296 (patch)
tree1811d0c75aaf4aa3f130d3f35a86c769dcfadce4 /storage/rocksdb/rdb_index_merge.cc
downloadmariadb-git-ebfc4e6ad02b0cef34ec3f446007b98d85af9296.tar.gz
Initial commit,
copy of commit 86587affafe77ef555f7c3839839de44f0f203f3 Author: Tian Xia <tianx@fb.com> Date: Tue Oct 4 10:01:52 2016 -0700 Allow filtering of show commands through admission control
Diffstat (limited to 'storage/rocksdb/rdb_index_merge.cc')
-rw-r--r--storage/rocksdb/rdb_index_merge.cc604
1 files changed, 604 insertions, 0 deletions
diff --git a/storage/rocksdb/rdb_index_merge.cc b/storage/rocksdb/rdb_index_merge.cc
new file mode 100644
index 00000000000..dc85db4d356
--- /dev/null
+++ b/storage/rocksdb/rdb_index_merge.cc
@@ -0,0 +1,604 @@
+/*
+ Copyright (c) 2016, Facebook, Inc.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/* This C++ file's header file */
+#include "./rdb_index_merge.h"
+
+/* MyRocks header files */
+#include "./ha_rocksdb.h"
+#include "./rdb_datadic.h"
+
+namespace myrocks {
+
+Rdb_index_merge::Rdb_index_merge(const ulonglong merge_buf_size,
+ const ulonglong merge_combine_read_size,
+ const rocksdb::Comparator* comparator) :
+ m_merge_buf_size(merge_buf_size),
+ m_merge_combine_read_size(merge_combine_read_size),
+ m_comparator(comparator),
+ m_rec_buf_unsorted(nullptr),
+ m_output_buf(nullptr)
+{
+}
+
+Rdb_index_merge::~Rdb_index_merge()
+{
+ /*
+ Close tmp file, we don't need to worry about deletion, mysql handles it.
+ */
+ my_close(m_merge_file.fd, MYF(MY_WME));
+
+ /* There should be no records left in the offset tree */
+ DBUG_ASSERT(m_offset_tree.empty());
+
+ /* There should be no pointers left on the merge heap */
+ DBUG_ASSERT(m_merge_min_heap.empty());
+}
+
+int Rdb_index_merge::init()
+{
+ /*
+ Create a temporary merge file on disk to store sorted chunks during
+ inplace index creation.
+ */
+ if (merge_file_create())
+ {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ /*
+ Then, allocate buffer to store unsorted records before they are written
+ to disk. They will be written to disk sorted. A sorted tree is used to
+ keep track of the offset of each record within the unsorted buffer.
+ */
+ m_rec_buf_unsorted= std::make_shared<merge_buf_info>(m_merge_buf_size);
+
+ /*
+ Allocate output buffer that will contain sorted block that is written to
+ disk.
+ */
+ m_output_buf= std::make_shared<merge_buf_info>(m_merge_buf_size);
+
+ return 0;
+}
+
+/**
+ Create a merge file in the given location.
+*/
+int Rdb_index_merge::merge_file_create()
+{
+ DBUG_ASSERT(m_merge_file.fd == -1);
+
+ int fd = mysql_tmpfile("myrocks");
+
+ if (fd < 0)
+ {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ m_merge_file.fd = fd;
+ m_merge_file.num_sort_buffers = 0;
+
+ return 0;
+}
+
+/**
+ Add record to offset tree (and unsorted merge buffer) in preparation for
+ writing out to disk in sorted chunks.
+
+ If buffer in memory is full, write the buffer out to disk sorted using the
+ offset tree, and clear the tree. (Happens in merge_buf_write)
+*/
+int Rdb_index_merge::add(const rocksdb::Slice& key,
+ const rocksdb::Slice& val)
+{
+ /* Adding a record after heap is already created results in error */
+ DBUG_ASSERT(m_merge_min_heap.empty());
+
+ /*
+ Check if sort buffer is going to be out of space, if so write it
+ out to disk in sorted order using offset tree.
+ */
+ uint total_offset= RDB_MERGE_CHUNK_LEN + m_rec_buf_unsorted->curr_offset +
+ RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
+ key.size() + val.size();
+ if (total_offset >= m_rec_buf_unsorted->total_size)
+ {
+ if (merge_buf_write())
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error writing sort buffer to disk.");
+ return HA_ERR_INTERNAL_ERROR;
+ }
+ }
+
+ ulonglong rec_offset= m_rec_buf_unsorted->curr_offset;
+
+ /*
+ Store key and value in temporary unsorted in memory buffer pointed to by
+ offset tree.
+ */
+ m_rec_buf_unsorted->store_key_value(key, val);
+
+ /* Find sort order of the new record */
+ m_offset_tree.emplace(m_rec_buf_unsorted->block.get() + rec_offset,
+ m_comparator);
+
+ return 0;
+}
+
+/**
+ Sort + write merge buffer chunk out to disk.
+*/
+int Rdb_index_merge::merge_buf_write()
+{
+ DBUG_ASSERT(m_merge_file.fd != -1);
+ DBUG_ASSERT(m_rec_buf_unsorted != nullptr);
+ DBUG_ASSERT(m_output_buf != nullptr);
+ DBUG_ASSERT(!m_offset_tree.empty());
+
+ /* Write actual chunk size to first 8 bytes of the merge buffer */
+ merge_store_uint64(m_output_buf->block.get(),
+ m_rec_buf_unsorted->curr_offset + RDB_MERGE_CHUNK_LEN);
+ m_output_buf->curr_offset += RDB_MERGE_CHUNK_LEN;
+
+ /*
+ Iterate through the offset tree. Should be ordered by the secondary key
+ at this point.
+ */
+ for (auto& rec : m_offset_tree)
+ {
+ DBUG_ASSERT(m_output_buf->curr_offset <= m_merge_buf_size);
+
+ /* Read record from offset (should never fail) */
+ rocksdb::Slice key;
+ rocksdb::Slice val;
+ merge_read_rec(rec.block, &key, &val);
+
+ /* Store key and value into sorted output buffer */
+ m_output_buf->store_key_value(key, val);
+ }
+
+ DBUG_ASSERT(m_output_buf->curr_offset <= m_output_buf->total_size);
+
+ /*
+ Write output buffer to disk.
+
+ Need to position cursor to the chunk it needs to be at on filesystem
+ then write into the respective merge buffer.
+ */
+ if (my_seek(m_merge_file.fd, m_merge_file.num_sort_buffers * m_merge_buf_size,
+ SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error seeking to location in merge file on disk.");
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ if (my_write(m_merge_file.fd, m_output_buf->block.get(),
+ m_output_buf->total_size, MYF(MY_WME | MY_NABP)))
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error writing sorted merge buffer to disk.");
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ /* Increment merge file offset to track number of merge buffers written */
+ m_merge_file.num_sort_buffers += 1;
+
+ /* Reset everything for next run */
+ merge_reset();
+
+ return 0;
+}
+
+/**
+ Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
+ secondary key records.
+*/
+int Rdb_index_merge::merge_heap_prepare()
+{
+ DBUG_ASSERT(m_merge_min_heap.empty());
+
+ /*
+ If the offset tree is not empty, there are still some records that need to
+ be written to disk. Write them out now.
+ */
+ if (!m_offset_tree.empty() && merge_buf_write())
+ {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ DBUG_ASSERT(m_merge_file.num_sort_buffers > 0);
+
+ /*
+ For an n-way merge, we need to read chunks of each merge file
+ simultaneously.
+ */
+ ulonglong chunk_size= m_merge_combine_read_size/
+ m_merge_file.num_sort_buffers;
+ if (chunk_size >= m_merge_buf_size)
+ {
+ chunk_size= m_merge_buf_size;
+ }
+
+ /* Allocate buffers for each chunk */
+ for (ulonglong i = 0; i < m_merge_file.num_sort_buffers; i++)
+ {
+ auto entry= std::make_shared<merge_heap_entry>(m_comparator);
+
+ /*
+ Read chunk_size bytes from each chunk on disk, and place inside
+ respective chunk buffer.
+ */
+ size_t total_size=
+ entry->prepare(m_merge_file.fd, i * m_merge_buf_size, chunk_size);
+
+ if (total_size == (size_t) - 1)
+ {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ /* Can reach this condition if an index was added on table w/ no rows */
+ if (total_size - RDB_MERGE_CHUNK_LEN == 0)
+ {
+ break;
+ }
+
+ /* Read the first record from each buffer to initially populate the heap */
+ if (entry->read_rec(&entry->key, &entry->val))
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Chunk size is too small to process merge.");
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ m_merge_min_heap.push(std::move(entry));
+ }
+
+ return 0;
+}
+
+/**
+ Create and/or iterate through keys in the merge heap.
+*/
+int Rdb_index_merge::next(rocksdb::Slice* key, rocksdb::Slice* val)
+{
+ /*
+ If table fits in one sort buffer, we can optimize by writing
+ the sort buffer directly through to the sstfilewriter instead of
+ needing to create tmp files/heap to merge the sort buffers.
+
+ If there are no sort buffer records (alters on empty tables),
+ also exit here.
+ */
+ if (m_merge_file.num_sort_buffers == 0)
+ {
+ if (m_offset_tree.empty())
+ {
+ return -1;
+ }
+
+ auto rec= m_offset_tree.begin();
+
+ /* Read record from offset */
+ merge_read_rec(rec->block, key, val);
+
+ m_offset_tree.erase(rec);
+ return 0;
+ }
+
+ int res;
+
+ /*
+ If heap and heap chunk info are empty, we must be beginning the merge phase
+ of the external sort. Populate the heap with initial values from each
+ disk chunk.
+ */
+ if (m_merge_min_heap.empty())
+ {
+ if ((res= merge_heap_prepare()))
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error during preparation of heap.");
+ return res;
+ }
+
+ /*
+ Return the first top record without popping, as we haven't put this
+ inside the SST file yet.
+ */
+ merge_heap_top(key, val);
+ return 0;
+ }
+
+ DBUG_ASSERT(!m_merge_min_heap.empty());
+ return merge_heap_pop_and_get_next(key, val);
+}
+
+/**
+ Get current top record from the heap.
+*/
+void Rdb_index_merge::merge_heap_top(rocksdb::Slice* key,
+ rocksdb::Slice* val)
+{
+ DBUG_ASSERT(!m_merge_min_heap.empty());
+
+ const std::shared_ptr<merge_heap_entry>& entry= m_merge_min_heap.top();
+ *key= entry->key;
+ *val= entry->val;
+}
+
+/**
+ Pops the top record, and uses it to read next record from the
+ corresponding sort buffer and push onto the heap.
+
+ Returns -1 when there are no more records in the heap.
+*/
+int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice* key,
+ rocksdb::Slice* val)
+{
+ /*
+ Make a new reference to shared ptr so it doesn't get destroyed
+ during pop(). We are going to push this entry back onto the heap.
+ */
+ const std::shared_ptr<merge_heap_entry> entry= m_merge_min_heap.top();
+ m_merge_min_heap.pop();
+
+ /*
+ We are finished w/ current chunk if:
+ current_offset + disk_offset == total_size
+
+ Return without adding entry back onto heap.
+ If heap is also empty, we must be finished with merge.
+ */
+ if (entry->chunk_info->is_chunk_finished())
+ {
+ if (m_merge_min_heap.empty())
+ {
+ return -1;
+ }
+
+ merge_heap_top(key, val);
+ return 0;
+ }
+
+ /*
+ Make sure we haven't reached the end of the chunk.
+ */
+ DBUG_ASSERT(!entry->chunk_info->is_chunk_finished());
+
+ /*
+ If merge_read_rec fails, it means the either the chunk was cut off
+ or we've reached the end of the respective chunk.
+ */
+ if (entry->read_rec(&entry->key, &entry->val))
+ {
+ if (entry->read_next_chunk_from_disk(m_merge_file.fd))
+ {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+
+ /* Try reading record again, should never fail. */
+ if (entry->read_rec(&entry->key, &entry->val))
+ {
+ return HA_ERR_INTERNAL_ERROR;
+ }
+ }
+
+ /* Push entry back on to the heap w/ updated buffer + offset ptr */
+ m_merge_min_heap.push(std::move(entry));
+
+ /* Return the current top record on heap */
+ merge_heap_top(key, val);
+ return 0;
+}
+
+int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd)
+{
+ if (chunk_info->read_next_chunk_from_disk(fd))
+ {
+ return 1;
+ }
+
+ block= chunk_info->block.get();
+ return 0;
+}
+
+int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd)
+{
+ disk_curr_offset += curr_offset;
+
+ if (my_seek(fd, disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error seeking to location in merge file on disk.");
+ return 1;
+ }
+
+ /* Overwrite the old block */
+ size_t bytes_read= my_read(fd, block.get(), block_len, MYF(MY_WME));
+ if (bytes_read == (size_t) -1)
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error reading merge file from disk.");
+ return 1;
+ }
+
+ curr_offset= 0;
+ return 0;
+}
+
+/**
+ Get records from offset within sort buffer and compare them.
+ Sort by least to greatest.
+*/
+int Rdb_index_merge::merge_record_compare(const uchar* a_block,
+ const uchar* b_block,
+ const rocksdb::Comparator* const comparator)
+{
+ return comparator->Compare(as_slice(a_block), as_slice(b_block));
+}
+
+/**
+ Given an offset in a merge sort buffer, read out the keys + values.
+ After this, block will point to the next record in the buffer.
+**/
+void Rdb_index_merge::merge_read_rec(const uchar* block,
+ rocksdb::Slice* key,
+ rocksdb::Slice* val)
+{
+ /* Read key at block offset into key slice and the value into value slice*/
+ read_slice(key, block);
+ read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
+}
+
+void Rdb_index_merge::read_slice(rocksdb::Slice* slice, const uchar* block_ptr)
+{
+ uint64 slice_len;
+ merge_read_uint64(&block_ptr, &slice_len);
+
+ *slice= rocksdb::Slice(reinterpret_cast<const char*>(block_ptr), slice_len);
+}
+
+int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *key,
+ rocksdb::Slice *val)
+{
+ const uchar* block_ptr= block;
+
+ /* Read key at block offset into key slice and the value into value slice*/
+ if (read_slice(key, &block_ptr) != 0 || read_slice(val, &block_ptr) != 0)
+ {
+ return 1;
+ }
+
+ chunk_info->curr_offset += (uintptr_t) block_ptr - (uintptr_t) block;
+ block += (uintptr_t) block_ptr - (uintptr_t) block;
+
+ return 0;
+}
+
+int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice* slice,
+ const uchar** block_ptr)
+{
+ if (!chunk_info->has_space(RDB_MERGE_REC_DELIMITER))
+ {
+ return 1;
+ }
+
+ uint64 slice_len;
+ merge_read_uint64(block_ptr, &slice_len);
+ if (!chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len))
+ {
+ return 1;
+ }
+
+ *slice= rocksdb::Slice(reinterpret_cast<const char*>(*block_ptr), slice_len);
+ *block_ptr += slice_len;
+ return 0;
+}
+
+size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
+ ulonglong chunk_size)
+{
+ chunk_info= std::make_shared<merge_buf_info>(chunk_size);
+ size_t res = chunk_info->prepare(fd, f_offset);
+ if (res != (size_t) - 1)
+ {
+ block= chunk_info->block.get() + RDB_MERGE_CHUNK_LEN;
+ }
+
+ return res;
+}
+
+size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset)
+{
+ disk_start_offset= f_offset;
+ disk_curr_offset= f_offset;
+
+ /*
+ Need to position cursor to the chunk it needs to be at on filesystem
+ then read 'chunk_size' bytes into the respective chunk buffer.
+ */
+ if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error seeking to location in merge file on disk.");
+ return (size_t) - 1;
+ }
+
+ size_t bytes_read= my_read(fd, block.get(), total_size, MYF(MY_WME));
+ if (bytes_read == (size_t) - 1)
+ {
+ // NO_LINT_DEBUG
+ sql_print_error("Error reading merge file from disk.");
+ return (size_t) - 1;
+ }
+
+ /*
+ Read the first 8 bytes of each chunk, this gives us the actual
+ size of each chunk.
+ */
+ const uchar *block_ptr= block.get();
+ merge_read_uint64(&block_ptr, &total_size);
+ curr_offset += RDB_MERGE_CHUNK_LEN;
+ return total_size;
+}
+
+/* Store key and value w/ their respective delimiters at the given offset */
+void Rdb_index_merge::merge_buf_info::store_key_value(
+ const rocksdb::Slice& key, const rocksdb::Slice& val)
+{
+ store_slice(key);
+ store_slice(val);
+}
+
+void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice& slice)
+{
+ /* Store length delimiter */
+ merge_store_uint64(&block[curr_offset], slice.size());
+
+ /* Store slice data */
+ memcpy(&block[curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
+ slice.size());
+
+ curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
+}
+
+
+void Rdb_index_merge::merge_reset()
+{
+ /*
+ Either error, or all values in the sort buffer have been written to disk,
+ so we need to clear the offset tree.
+ */
+ m_offset_tree.clear();
+
+ /* Reset sort buffer block */
+ if (m_rec_buf_unsorted && m_rec_buf_unsorted->block)
+ {
+ m_rec_buf_unsorted->curr_offset= 0;
+ }
+
+ /* Reset output buf */
+ if (m_output_buf && m_output_buf->block)
+ {
+ m_output_buf->curr_offset= 0;
+ }
+}
+
+} // namespace myrocks
+