/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: #ident "$Id$" /*====== This file is part of PerconaFT. Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. PerconaFT is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2, as published by the Free Software Foundation. PerconaFT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with PerconaFT. If not, see . ---------------------------------------- PerconaFT is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License, version 3, as published by the Free Software Foundation. PerconaFT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with PerconaFT. If not, see . ======= */ #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." #include #include #include #include namespace toku { template void dmt::create(void) { toku_mempool_zero(&this->mp); this->values_same_size = true; this->value_length = 0; this->is_array = true; this->d.a.num_values = 0; //TODO: maybe allocate enough space for something by default? // We may be relying on not needing to allocate space the first time (due to limited time spent while a lock is held) } /** * Note: create_from_sorted_memory_of_fixed_size_elements does not take ownership of 'mem'. * Owner is still responsible for freeing it. * While in the OMT a similar function would steal ownership, this doesn't make sense for the DMT because * we (usually) have to add padding for alignment (mem has all of the elements PACKED). * Also all current uses (as of Jan 12, 2014) of this function would require mallocing a new array * in order to allow stealing. */ template void dmt::create_from_sorted_memory_of_fixed_size_elements( const void *mem, const uint32_t numvalues, const uint32_t mem_length, const uint32_t fixed_value_length) { this->values_same_size = true; this->value_length = fixed_value_length; this->is_array = true; this->d.a.num_values = numvalues; const uint8_t pad_bytes = get_fixed_length_alignment_overhead(); uint32_t aligned_memsize = mem_length + numvalues * pad_bytes; toku_mempool_construct(&this->mp, aligned_memsize); if (aligned_memsize > 0) { paranoid_invariant(numvalues > 0); void *ptr = toku_mempool_malloc(&this->mp, aligned_memsize); paranoid_invariant_notnull(ptr); uint8_t * const CAST_FROM_VOIDP(dest, ptr); const uint8_t * const CAST_FROM_VOIDP(src, mem); if (pad_bytes == 0) { paranoid_invariant(aligned_memsize == mem_length); memcpy(dest, src, aligned_memsize); } else { // TODO(leif): check what vectorizes best: multiplying like this or adding to offsets const uint32_t fixed_len = this->value_length; const uint32_t fixed_aligned_len = align(this->value_length); paranoid_invariant(this->d.a.num_values*fixed_len == mem_length); for (uint32_t i = 0; i < this->d.a.num_values; i++) { memcpy(&dest[i*fixed_aligned_len], &src[i*fixed_len], fixed_len); } } } } template void dmt::clone(const dmt &src) { *this = src; toku_mempool_clone(&src.mp, &this->mp); } template void dmt::clear(void) { this->is_array = true; this->d.a.num_values = 0; this->values_same_size = true; // Reset state this->value_length = 0; //TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool) // One 'fix' is for mempool to also track what was touched, and reset() shouldn't reset that, though realloc() might. toku_mempool_reset(&this->mp); } template void dmt::destroy(void) { this->clear(); toku_mempool_destroy(&this->mp); } template uint32_t dmt::size(void) const { if (this->is_array) { return this->d.a.num_values; } else { return this->nweight(this->d.t.root); } } template uint32_t dmt::nweight(const subtree &subtree) const { if (subtree.is_null()) { return 0; } else { const dmt_node & node = get_node(subtree); return node.weight; } } template template int dmt::insert(const dmtwriter_t &value, const dmtcmp_t &v, uint32_t *const idx) { int r; uint32_t insert_idx; r = this->find_zero(v, nullptr, nullptr, &insert_idx); if (r==0) { if (idx) *idx = insert_idx; return DB_KEYEXIST; } if (r != DB_NOTFOUND) return r; if ((r = this->insert_at(value, insert_idx))) return r; if (idx) *idx = insert_idx; return 0; } template int dmt::insert_at(const dmtwriter_t &value, const uint32_t idx) { if (idx > this->size()) { return EINVAL; } bool same_size = this->values_same_size && (this->size() == 0 || value.get_size() == this->value_length); if (this->is_array) { if (same_size && idx == this->d.a.num_values) { return this->insert_at_array_end(value); } this->convert_from_array_to_tree(); } // Is a tree. paranoid_invariant(!is_array); if (!same_size) { this->values_same_size = false; this->value_length = 0; } this->maybe_resize_tree(&value); subtree *rebalance_subtree = nullptr; this->insert_internal(&this->d.t.root, value, idx, &rebalance_subtree); if (rebalance_subtree != nullptr) { this->rebalance(rebalance_subtree); } return 0; } template template int dmt::insert_at_array_end(const dmtwriter_t& value_in) { paranoid_invariant(this->is_array); paranoid_invariant(this->values_same_size); if (this->d.a.num_values == 0) { this->value_length = value_in.get_size(); } paranoid_invariant(this->value_length == value_in.get_size()); if (with_resize) { this->maybe_resize_array_for_insert(); } dmtdata_t *dest = this->alloc_array_value_end(); value_in.write_to(dest); return 0; } template dmtdata_t * dmt::alloc_array_value_end(void) { paranoid_invariant(this->is_array); paranoid_invariant(this->values_same_size); this->d.a.num_values++; void *ptr = toku_mempool_malloc(&this->mp, align(this->value_length)); paranoid_invariant_notnull(ptr); paranoid_invariant(reinterpret_cast(ptr) % ALIGNMENT == 0); dmtdata_t *CAST_FROM_VOIDP(n, ptr); paranoid_invariant(n == get_array_value(this->d.a.num_values - 1)); return n; } template dmtdata_t * dmt::get_array_value(const uint32_t idx) const { paranoid_invariant(this->is_array); paranoid_invariant(this->values_same_size); paranoid_invariant(idx < this->d.a.num_values); return get_array_value_internal(&this->mp, idx); } template dmtdata_t * dmt::get_array_value_internal(const struct mempool *mempool, const uint32_t idx) const { void* ptr = toku_mempool_get_pointer_from_base_and_offset(mempool, idx * align(this->value_length)); dmtdata_t *CAST_FROM_VOIDP(value, ptr); return value; } //TODO(leif) write microbenchmarks to compare growth factor. Note: growth factor here is actually 2.5 because of mempool_construct template void dmt::maybe_resize_array_for_insert(void) { bool space_available = toku_mempool_get_free_size(&this->mp) >= align(this->value_length); if (!space_available) { const uint32_t n = this->d.a.num_values + 1; const uint32_t new_n = n <=2 ? 4 : 2*n; const uint32_t new_space = align(this->value_length) * new_n; struct mempool new_kvspace; toku_mempool_construct(&new_kvspace, new_space); size_t copy_bytes = this->d.a.num_values * align(this->value_length); invariant(copy_bytes + align(this->value_length) <= new_space); paranoid_invariant(copy_bytes <= toku_mempool_get_used_size(&this->mp)); // Copy over to new mempool if (this->d.a.num_values > 0) { void* dest = toku_mempool_malloc(&new_kvspace, copy_bytes); invariant(dest!=nullptr); memcpy(dest, get_array_value(0), copy_bytes); } toku_mempool_destroy(&this->mp); this->mp = new_kvspace; } } template uint32_t dmt::align(const uint32_t x) const { return roundup_to_multiple(ALIGNMENT, x); } template void dmt::prepare_for_serialize(void) { if (!this->is_array) { this->convert_from_tree_to_array(); } } template void dmt::convert_from_tree_to_array(void) { paranoid_invariant(!this->is_array); paranoid_invariant(this->values_same_size); const uint32_t num_values = this->size(); node_offset *tmp_array; bool malloced = false; tmp_array = alloc_temp_node_offsets(num_values); if (!tmp_array) { malloced = true; XMALLOC_N(num_values, tmp_array); } this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root); struct mempool new_mp; const uint32_t fixed_len = this->value_length; const uint32_t fixed_aligned_len = align(this->value_length); size_t mem_needed = num_values * fixed_aligned_len; toku_mempool_construct(&new_mp, mem_needed); uint8_t* CAST_FROM_VOIDP(dest, toku_mempool_malloc(&new_mp, mem_needed)); paranoid_invariant_notnull(dest); for (uint32_t i = 0; i < num_values; i++) { const dmt_node &n = get_node(tmp_array[i]); memcpy(&dest[i*fixed_aligned_len], &n.value, fixed_len); } toku_mempool_destroy(&this->mp); this->mp = new_mp; this->is_array = true; this->d.a.num_values = num_values; if (malloced) toku_free(tmp_array); } template void dmt::convert_from_array_to_tree(void) { paranoid_invariant(this->is_array); paranoid_invariant(this->values_same_size); //save array-format information to locals const uint32_t num_values = this->d.a.num_values; node_offset *tmp_array; bool malloced = false; tmp_array = alloc_temp_node_offsets(num_values); if (!tmp_array) { malloced = true; XMALLOC_N(num_values, tmp_array); } struct mempool old_mp = this->mp; size_t mem_needed = num_values * align(this->value_length + __builtin_offsetof(dmt_node, value)); toku_mempool_construct(&this->mp, mem_needed); for (uint32_t i = 0; i < num_values; i++) { dmtwriter_t writer(this->value_length, get_array_value_internal(&old_mp, i)); tmp_array[i] = node_malloc_and_set_value(writer); } this->is_array = false; this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, num_values); if (malloced) toku_free(tmp_array); toku_mempool_destroy(&old_mp); } template int dmt::delete_at(const uint32_t idx) { uint32_t n = this->size(); if (idx >= n) { return EINVAL; } if (n == 1) { this->clear(); //Emptying out the entire dmt. return 0; } if (this->is_array) { this->convert_from_array_to_tree(); } paranoid_invariant(!is_array); subtree *rebalance_subtree = nullptr; this->delete_internal(&this->d.t.root, idx, nullptr, &rebalance_subtree); if (rebalance_subtree != nullptr) { this->rebalance(rebalance_subtree); } this->maybe_resize_tree(nullptr); return 0; } template template int dmt::iterate(iterate_extra_t *const iterate_extra) const { return this->iterate_on_range(0, this->size(), iterate_extra); } template template int dmt::iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const { if (right > this->size()) { return EINVAL; } if (left == right) { return 0; } if (this->is_array) { return this->iterate_internal_array(left, right, iterate_extra); } return this->iterate_internal(left, right, this->d.t.root, 0, iterate_extra); } template void dmt::verify(void) const { uint32_t num_values = this->size(); invariant(num_values < UINT32_MAX); size_t pool_used = toku_mempool_get_used_size(&this->mp); size_t pool_size = toku_mempool_get_size(&this->mp); size_t pool_frag = toku_mempool_get_frag_size(&this->mp); invariant(pool_used <= pool_size); if (this->is_array) { invariant(this->values_same_size); invariant(num_values == this->d.a.num_values); // We know exactly how much memory should be used. invariant(pool_used == num_values * align(this->value_length)); // Array form must have 0 fragmentation in mempool. invariant(pool_frag == 0); } else { if (this->values_same_size) { // We know exactly how much memory should be used. invariant(pool_used == num_values * align(this->value_length + __builtin_offsetof(dmt_node, value))); } else { // We can only do a lower bound on memory usage. invariant(pool_used >= num_values * __builtin_offsetof(dmt_node, value)); } std::vector touched(pool_size, false); verify_internal(this->d.t.root, &touched); size_t bytes_used = 0; for (size_t i = 0; i < pool_size; i++) { if (touched.at(i)) { ++bytes_used; } } invariant(bytes_used == pool_used); } } // Verifies all weights are internally consistent. template void dmt::verify_internal(const subtree &subtree, std::vector *touched) const { if (subtree.is_null()) { return; } const dmt_node &node = get_node(subtree); if (this->values_same_size) { invariant(node.value_length == this->value_length); } size_t offset = toku_mempool_get_offset_from_pointer_and_base(&this->mp, &node); size_t node_size = align(__builtin_offsetof(dmt_node, value) + node.value_length); invariant(offset <= touched->size()); invariant(offset+node_size <= touched->size()); invariant(offset % ALIGNMENT == 0); // Mark memory as touched and never allocated to multiple nodes. for (size_t i = offset; i < offset+node_size; ++i) { invariant(!touched->at(i)); touched->at(i) = true; } const uint32_t leftweight = this->nweight(node.left); const uint32_t rightweight = this->nweight(node.right); invariant(leftweight + rightweight + 1 == this->nweight(subtree)); verify_internal(node.left, touched); verify_internal(node.right, touched); } template template void dmt::iterate_ptr(iterate_extra_t *const iterate_extra) { if (this->is_array) { this->iterate_ptr_internal_array(0, this->size(), iterate_extra); } else { this->iterate_ptr_internal(0, this->size(), this->d.t.root, 0, iterate_extra); } } template int dmt::fetch(const uint32_t idx, uint32_t *const value_len, dmtdataout_t *const value) const { if (idx >= this->size()) { return EINVAL; } if (this->is_array) { this->fetch_internal_array(idx, value_len, value); } else { this->fetch_internal(this->d.t.root, idx, value_len, value); } return 0; } template template int dmt::find_zero(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { uint32_t tmp_index; uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index; int r; if (this->is_array) { r = this->find_internal_zero_array(extra, value_len, value, child_idxp); } else { r = this->find_internal_zero(this->d.t.root, extra, value_len, value, child_idxp); } return r; } template template int dmt::find(const dmtcmp_t &extra, int direction, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { uint32_t tmp_index; uint32_t *const child_idxp = (idxp != nullptr) ? idxp : &tmp_index; paranoid_invariant(direction != 0); if (direction < 0) { if (this->is_array) { return this->find_internal_minus_array(extra, value_len, value, child_idxp); } else { return this->find_internal_minus(this->d.t.root, extra, value_len, value, child_idxp); } } else { if (this->is_array) { return this->find_internal_plus_array(extra, value_len, value, child_idxp); } else { return this->find_internal_plus(this->d.t.root, extra, value_len, value, child_idxp); } } } template size_t dmt::memory_size(void) { return (sizeof *this) + toku_mempool_get_size(&this->mp); } template dmt_node_templated & dmt::get_node(const subtree &subtree) const { paranoid_invariant(!subtree.is_null()); return get_node(subtree.get_offset()); } template dmt_node_templated & dmt::get_node(const node_offset offset) const { void* ptr = toku_mempool_get_pointer_from_base_and_offset(&this->mp, offset); dmt_node *CAST_FROM_VOIDP(node, ptr); return *node; } template void dmt::node_set_value(dmt_node * n, const dmtwriter_t &value) { n->value_length = value.get_size(); value.write_to(&n->value); } template node_offset dmt::node_malloc_and_set_value(const dmtwriter_t &value) { size_t val_size = value.get_size(); size_t size_to_alloc = __builtin_offsetof(dmt_node, value) + val_size; size_to_alloc = align(size_to_alloc); void* np = toku_mempool_malloc(&this->mp, size_to_alloc); paranoid_invariant_notnull(np); dmt_node *CAST_FROM_VOIDP(n, np); node_set_value(n, value); return toku_mempool_get_offset_from_pointer_and_base(&this->mp, np); } template void dmt::node_free(const subtree &st) { dmt_node &n = get_node(st); size_t size_to_free = __builtin_offsetof(dmt_node, value) + n.value_length; size_to_free = align(size_to_free); toku_mempool_mfree(&this->mp, &n, size_to_free); } template void dmt::maybe_resize_tree(const dmtwriter_t * value) { const ssize_t curr_capacity = toku_mempool_get_size(&this->mp); const ssize_t curr_free = toku_mempool_get_free_size(&this->mp); const ssize_t curr_used = toku_mempool_get_used_size(&this->mp); ssize_t add_size = 0; if (value) { add_size = __builtin_offsetof(dmt_node, value) + value->get_size(); add_size = align(add_size); } const ssize_t need_size = curr_used + add_size; paranoid_invariant(need_size <= UINT32_MAX); //TODO(leif) consider different growth rates const ssize_t new_size = 2*need_size; paranoid_invariant(new_size <= UINT32_MAX); if ((curr_capacity / 2 >= new_size) || // Way too much allocated (curr_free < add_size)) { // No room in mempool // Copy all memory and reconstruct dmt in new mempool. if (curr_free < add_size && toku_mempool_get_frag_size(&this->mp) == 0) { // TODO(yoni) or TODO(leif) consider doing this not just when frag size is zero, but also when it is a small percentage of the total mempool size // Offsets remain the same in the new mempool so we can just realloc. toku_mempool_realloc_larger(&this->mp, new_size); } else if (!this->d.t.root.is_null()) { struct mempool new_kvspace; toku_mempool_construct(&new_kvspace, new_size); const dmt_node &n = get_node(this->d.t.root); node_offset *tmp_array; bool malloced = false; tmp_array = alloc_temp_node_offsets(n.weight); if (!tmp_array) { malloced = true; XMALLOC_N(n.weight, tmp_array); } this->fill_array_with_subtree_offsets(tmp_array, this->d.t.root); for (node_offset i = 0; i < n.weight; i++) { dmt_node &node = get_node(tmp_array[i]); const size_t bytes_to_copy = __builtin_offsetof(dmt_node, value) + node.value_length; const size_t bytes_to_alloc = align(bytes_to_copy); void* newdata = toku_mempool_malloc(&new_kvspace, bytes_to_alloc); memcpy(newdata, &node, bytes_to_copy); tmp_array[i] = toku_mempool_get_offset_from_pointer_and_base(&new_kvspace, newdata); } struct mempool old_kvspace = this->mp; this->mp = new_kvspace; this->rebuild_subtree_from_offsets(&this->d.t.root, tmp_array, n.weight); if (malloced) toku_free(tmp_array); toku_mempool_destroy(&old_kvspace); } else { toku_mempool_destroy(&this->mp); toku_mempool_construct(&this->mp, new_size); } } } template bool dmt::will_need_rebalance(const subtree &subtree, const int leftmod, const int rightmod) const { if (subtree.is_null()) { return false; } const dmt_node &n = get_node(subtree); // one of the 1's is for the root. // the other is to take ceil(n/2) const uint32_t weight_left = this->nweight(n.left) + leftmod; const uint32_t weight_right = this->nweight(n.right) + rightmod; return ((1+weight_left < (1+1+weight_right)/2) || (1+weight_right < (1+1+weight_left)/2)); } template void dmt::insert_internal(subtree *const subtreep, const dmtwriter_t &value, const uint32_t idx, subtree **const rebalance_subtree) { if (subtreep->is_null()) { paranoid_invariant_zero(idx); const node_offset newoffset = this->node_malloc_and_set_value(value); dmt_node &newnode = get_node(newoffset); newnode.weight = 1; newnode.left.set_to_null(); newnode.right.set_to_null(); subtreep->set_offset(newoffset); } else { dmt_node &n = get_node(*subtreep); n.weight++; if (idx <= this->nweight(n.left)) { if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 1, 0)) { *rebalance_subtree = subtreep; } this->insert_internal(&n.left, value, idx, rebalance_subtree); } else { if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, 1)) { *rebalance_subtree = subtreep; } const uint32_t sub_index = idx - this->nweight(n.left) - 1; this->insert_internal(&n.right, value, sub_index, rebalance_subtree); } } } template void dmt::delete_internal(subtree *const subtreep, const uint32_t idx, subtree *const subtree_replace, subtree **const rebalance_subtree) { paranoid_invariant_notnull(subtreep); paranoid_invariant_notnull(rebalance_subtree); paranoid_invariant(!subtreep->is_null()); dmt_node &n = get_node(*subtreep); const uint32_t leftweight = this->nweight(n.left); if (idx < leftweight) { n.weight--; if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, -1, 0)) { *rebalance_subtree = subtreep; } this->delete_internal(&n.left, idx, subtree_replace, rebalance_subtree); } else if (idx == leftweight) { // Found the correct index. if (n.left.is_null()) { paranoid_invariant_zero(idx); // Delete n and let parent point to n.right subtree ptr_this = *subtreep; *subtreep = n.right; subtree to_free; if (subtree_replace != nullptr) { // Swap self with the other node. Taking over all responsibility. to_free = *subtree_replace; dmt_node &ancestor = get_node(*subtree_replace); if (*rebalance_subtree == &ancestor.right) { // Take over rebalance responsibility. *rebalance_subtree = &n.right; } n.weight = ancestor.weight; n.left = ancestor.left; n.right = ancestor.right; *subtree_replace = ptr_this; } else { to_free = ptr_this; } this->node_free(to_free); } else if (n.right.is_null()) { // Delete n and let parent point to n.left subtree to_free = *subtreep; *subtreep = n.left; paranoid_invariant(idx>0); paranoid_invariant_null(subtree_replace); // To be recursive, we're looking for index 0. n is index > 0 here. this->node_free(to_free); } else { if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, -1)) { *rebalance_subtree = subtreep; } // don't need to copy up value, it's only used by this // next call, and when that gets to the bottom there // won't be any more recursion n.weight--; this->delete_internal(&n.right, 0, subtreep, rebalance_subtree); } } else { n.weight--; if (*rebalance_subtree == nullptr && this->will_need_rebalance(*subtreep, 0, -1)) { *rebalance_subtree = subtreep; } this->delete_internal(&n.right, idx - leftweight - 1, subtree_replace, rebalance_subtree); } } template template int dmt::iterate_internal_array(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const { int r; for (uint32_t i = left; i < right; ++i) { r = f(this->value_length, *get_array_value(i), i, iterate_extra); if (r != 0) { return r; } } return 0; } template template void dmt::iterate_ptr_internal(const uint32_t left, const uint32_t right, const subtree &subtree, const uint32_t idx, iterate_extra_t *const iterate_extra) { if (!subtree.is_null()) { dmt_node &n = get_node(subtree); const uint32_t idx_root = idx + this->nweight(n.left); if (left < idx_root) { this->iterate_ptr_internal(left, right, n.left, idx, iterate_extra); } if (left <= idx_root && idx_root < right) { int r = f(n.value_length, &n.value, idx_root, iterate_extra); lazy_assert_zero(r); } if (idx_root + 1 < right) { this->iterate_ptr_internal(left, right, n.right, idx_root + 1, iterate_extra); } } } template template void dmt::iterate_ptr_internal_array(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) { for (uint32_t i = left; i < right; ++i) { int r = f(this->value_length, get_array_value(i), i, iterate_extra); lazy_assert_zero(r); } } template template int dmt::iterate_internal(const uint32_t left, const uint32_t right, const subtree &subtree, const uint32_t idx, iterate_extra_t *const iterate_extra) const { if (subtree.is_null()) { return 0; } int r; const dmt_node &n = get_node(subtree); const uint32_t idx_root = idx + this->nweight(n.left); if (left < idx_root) { r = this->iterate_internal(left, right, n.left, idx, iterate_extra); if (r != 0) { return r; } } if (left <= idx_root && idx_root < right) { r = f(n.value_length, n.value, idx_root, iterate_extra); if (r != 0) { return r; } } if (idx_root + 1 < right) { return this->iterate_internal(left, right, n.right, idx_root + 1, iterate_extra); } return 0; } template void dmt::fetch_internal_array(const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const { copyout(value_len, value, this->value_length, get_array_value(i)); } template void dmt::fetch_internal(const subtree &subtree, const uint32_t i, uint32_t *const value_len, dmtdataout_t *const value) const { dmt_node &n = get_node(subtree); const uint32_t leftweight = this->nweight(n.left); if (i < leftweight) { this->fetch_internal(n.left, i, value_len, value); } else if (i == leftweight) { copyout(value_len, value, &n); } else { this->fetch_internal(n.right, i - leftweight - 1, value_len, value); } } template void dmt::fill_array_with_subtree_offsets(node_offset *const array, const subtree &subtree) const { if (!subtree.is_null()) { const dmt_node &tree = get_node(subtree); this->fill_array_with_subtree_offsets(&array[0], tree.left); array[this->nweight(tree.left)] = subtree.get_offset(); this->fill_array_with_subtree_offsets(&array[this->nweight(tree.left) + 1], tree.right); } } template void dmt::rebuild_subtree_from_offsets(subtree *const subtree, const node_offset *const offsets, const uint32_t numvalues) { if (numvalues==0) { subtree->set_to_null(); } else { uint32_t halfway = numvalues/2; subtree->set_offset(offsets[halfway]); dmt_node &newnode = get_node(offsets[halfway]); newnode.weight = numvalues; // value is already in there. this->rebuild_subtree_from_offsets(&newnode.left, &offsets[0], halfway); this->rebuild_subtree_from_offsets(&newnode.right, &offsets[halfway+1], numvalues-(halfway+1)); } } //TODO(leif): Note that this can mess with our memory_footprint calculation (we may touch past what is marked as 'used' in the mempool) template node_offset* dmt::alloc_temp_node_offsets(uint32_t num_offsets) { size_t mem_needed = num_offsets * sizeof(node_offset); size_t mem_free; mem_free = toku_mempool_get_free_size(&this->mp); node_offset* CAST_FROM_VOIDP(tmp, toku_mempool_get_next_free_ptr(&this->mp)); if (mem_free >= mem_needed) { return tmp; } return nullptr; } template void dmt::rebalance(subtree *const subtree) { paranoid_invariant(!subtree->is_null()); // There is a possible "optimization" here: // if (this->values_same_size && subtree == &this->d.t.root) { // this->convert_from_tree_to_array(); // return; // } // but we don't want to do it because it involves actually copying values around // as opposed to stopping in the middle of rebalancing (like in the OMT) node_offset offset = subtree->get_offset(); const dmt_node &n = get_node(offset); node_offset *tmp_array; bool malloced = false; tmp_array = alloc_temp_node_offsets(n.weight); if (!tmp_array) { malloced = true; XMALLOC_N(n.weight, tmp_array); } this->fill_array_with_subtree_offsets(tmp_array, *subtree); this->rebuild_subtree_from_offsets(subtree, tmp_array, n.weight); if (malloced) toku_free(tmp_array); } template void dmt::copyout(uint32_t *const outlen, dmtdata_t *const out, const dmt_node *const n) { if (outlen) { *outlen = n->value_length; } if (out) { *out = n->value; } } template void dmt::copyout(uint32_t *const outlen, dmtdata_t **const out, dmt_node *const n) { if (outlen) { *outlen = n->value_length; } if (out) { *out = &n->value; } } template void dmt::copyout(uint32_t *const outlen, dmtdata_t *const out, const uint32_t len, const dmtdata_t *const stored_value_ptr) { if (outlen) { *outlen = len; } if (out) { *out = *stored_value_ptr; } } template void dmt::copyout(uint32_t *const outlen, dmtdata_t **const out, const uint32_t len, dmtdata_t *const stored_value_ptr) { if (outlen) { *outlen = len; } if (out) { *out = stored_value_ptr; } } template template int dmt::find_internal_zero_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { paranoid_invariant_notnull(idxp); uint32_t min = 0; uint32_t limit = this->d.a.num_values; uint32_t best_pos = subtree::NODE_NULL; uint32_t best_zero = subtree::NODE_NULL; while (min!=limit) { uint32_t mid = (min + limit) / 2; int hv = h(this->value_length, *get_array_value(mid), extra); if (hv<0) { min = mid+1; } else if (hv>0) { best_pos = mid; limit = mid; } else { best_zero = mid; limit = mid; } } if (best_zero!=subtree::NODE_NULL) { //Found a zero copyout(value_len, value, this->value_length, get_array_value(best_zero)); *idxp = best_zero; return 0; } if (best_pos!=subtree::NODE_NULL) *idxp = best_pos; else *idxp = this->d.a.num_values; return DB_NOTFOUND; } template template int dmt::find_internal_zero(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { paranoid_invariant_notnull(idxp); if (subtree.is_null()) { *idxp = 0; return DB_NOTFOUND; } dmt_node &n = get_node(subtree); int hv = h(n.value_length, n.value, extra); if (hv<0) { int r = this->find_internal_zero(n.right, extra, value_len, value, idxp); *idxp += this->nweight(n.left)+1; return r; } else if (hv>0) { return this->find_internal_zero(n.left, extra, value_len, value, idxp); } else { int r = this->find_internal_zero(n.left, extra, value_len, value, idxp); if (r==DB_NOTFOUND) { *idxp = this->nweight(n.left); copyout(value_len, value, &n); r = 0; } return r; } } template template int dmt::find_internal_plus_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { paranoid_invariant_notnull(idxp); uint32_t min = 0; uint32_t limit = this->d.a.num_values; uint32_t best = subtree::NODE_NULL; while (min != limit) { const uint32_t mid = (min + limit) / 2; const int hv = h(this->value_length, *get_array_value(mid), extra); if (hv > 0) { best = mid; limit = mid; } else { min = mid + 1; } } if (best == subtree::NODE_NULL) { return DB_NOTFOUND; } copyout(value_len, value, this->value_length, get_array_value(best)); *idxp = best; return 0; } template template int dmt::find_internal_plus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { paranoid_invariant_notnull(idxp); if (subtree.is_null()) { return DB_NOTFOUND; } dmt_node & n = get_node(subtree); int hv = h(n.value_length, n.value, extra); int r; if (hv > 0) { r = this->find_internal_plus(n.left, extra, value_len, value, idxp); if (r == DB_NOTFOUND) { *idxp = this->nweight(n.left); copyout(value_len, value, &n); r = 0; } } else { r = this->find_internal_plus(n.right, extra, value_len, value, idxp); if (r == 0) { *idxp += this->nweight(n.left) + 1; } } return r; } template template int dmt::find_internal_minus_array(const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { paranoid_invariant_notnull(idxp); uint32_t min = 0; uint32_t limit = this->d.a.num_values; uint32_t best = subtree::NODE_NULL; while (min != limit) { const uint32_t mid = (min + limit) / 2; const int hv = h(this->value_length, *get_array_value(mid), extra); if (hv < 0) { best = mid; min = mid + 1; } else { limit = mid; } } if (best == subtree::NODE_NULL) { return DB_NOTFOUND; } copyout(value_len, value, this->value_length, get_array_value(best)); *idxp = best; return 0; } template template int dmt::find_internal_minus(const subtree &subtree, const dmtcmp_t &extra, uint32_t *const value_len, dmtdataout_t *const value, uint32_t *const idxp) const { paranoid_invariant_notnull(idxp); if (subtree.is_null()) { return DB_NOTFOUND; } dmt_node & n = get_node(subtree); int hv = h(n.value_length, n.value, extra); if (hv < 0) { int r = this->find_internal_minus(n.right, extra, value_len, value, idxp); if (r == 0) { *idxp += this->nweight(n.left) + 1; } else if (r == DB_NOTFOUND) { *idxp = this->nweight(n.left); copyout(value_len, value, &n); r = 0; } return r; } else { return this->find_internal_minus(n.left, extra, value_len, value, idxp); } } template uint32_t dmt::get_fixed_length(void) const { return this->values_same_size ? this->value_length : 0; } template uint32_t dmt::get_fixed_length_alignment_overhead(void) const { return this->values_same_size ? align(this->value_length) - this->value_length : 0; } template bool dmt::value_length_is_fixed(void) const { return this->values_same_size; } template void dmt::serialize_values(uint32_t expected_unpadded_memory, struct wbuf *wb) const { invariant(this->is_array); invariant(this->values_same_size); const uint8_t pad_bytes = get_fixed_length_alignment_overhead(); const uint32_t fixed_len = this->value_length; const uint32_t fixed_aligned_len = align(this->value_length); paranoid_invariant(expected_unpadded_memory == this->d.a.num_values * this->value_length); paranoid_invariant(toku_mempool_get_used_size(&this->mp) >= expected_unpadded_memory + pad_bytes * this->d.a.num_values); if (this->d.a.num_values == 0) { // Nothing to serialize } else if (pad_bytes == 0) { // Basically a memcpy wbuf_nocrc_literal_bytes(wb, get_array_value(0), expected_unpadded_memory); } else { uint8_t* const dest = wbuf_nocrc_reserve_literal_bytes(wb, expected_unpadded_memory); const uint8_t* const src = reinterpret_cast(get_array_value(0)); //TODO(leif) maybe look at vectorization here for (uint32_t i = 0; i < this->d.a.num_values; i++) { memcpy(&dest[i*fixed_len], &src[i*fixed_aligned_len], fixed_len); } } } template void dmt::builder::create(uint32_t _max_values, uint32_t _max_value_bytes) { this->max_values = _max_values; this->max_value_bytes = _max_value_bytes; this->temp.create(); paranoid_invariant_null(toku_mempool_get_base(&this->temp.mp)); this->temp_valid = true; this->sorted_node_offsets = nullptr; // Include enough space for alignment padding size_t initial_space = (ALIGNMENT - 1) * _max_values + _max_value_bytes; toku_mempool_construct(&this->temp.mp, initial_space); // Adds 25% } template void dmt::builder::append(const dmtwriter_t &value) { paranoid_invariant(this->temp_valid); //NOTE: Always use d.a.num_values for size because we have not yet created root. if (this->temp.values_same_size && (this->temp.d.a.num_values == 0 || value.get_size() == this->temp.value_length)) { temp.insert_at_array_end(value); return; } if (this->temp.is_array) { // Convert to tree format (without weights and linkage) XMALLOC_N(this->max_values, this->sorted_node_offsets); // Include enough space for alignment padding size_t mem_needed = (ALIGNMENT - 1 + __builtin_offsetof(dmt_node, value)) * max_values + max_value_bytes; struct mempool old_mp = this->temp.mp; const uint32_t num_values = this->temp.d.a.num_values; toku_mempool_construct(&this->temp.mp, mem_needed); // Copy over and get node_offsets for (uint32_t i = 0; i < num_values; i++) { dmtwriter_t writer(this->temp.value_length, this->temp.get_array_value_internal(&old_mp, i)); this->sorted_node_offsets[i] = this->temp.node_malloc_and_set_value(writer); } this->temp.is_array = false; this->temp.values_same_size = false; this->temp.value_length = 0; toku_mempool_destroy(&old_mp); } paranoid_invariant(!this->temp.is_array); this->sorted_node_offsets[this->temp.d.a.num_values++] = this->temp.node_malloc_and_set_value(value); } template bool dmt::builder::value_length_is_fixed(void) { paranoid_invariant(this->temp_valid); return this->temp.values_same_size; } template void dmt::builder::build(dmt *dest) { invariant(this->temp_valid); //NOTE: Always use d.a.num_values for size because we have not yet created root. invariant(this->temp.d.a.num_values <= this->max_values); // Memory invariant is taken care of incrementally (during append()) if (!this->temp.is_array) { invariant_notnull(this->sorted_node_offsets); this->temp.rebuild_subtree_from_offsets(&this->temp.d.t.root, this->sorted_node_offsets, this->temp.d.a.num_values); toku_free(this->sorted_node_offsets); this->sorted_node_offsets = nullptr; } paranoid_invariant_null(this->sorted_node_offsets); const size_t used = toku_mempool_get_used_size(&this->temp.mp); const size_t allocated = toku_mempool_get_size(&this->temp.mp); // We want to use no more than (about) the actual used space + 25% overhead for mempool growth. // When we know the elements are fixed-length, we use the better dmt constructor. // In practice, as of Jan 2014, we use the builder in two cases: // - When we know the elements are not fixed-length. // - During upgrade of a pre version 26 basement node. // During upgrade, we will probably wildly overallocate because we don't account for the values that aren't stored in the dmt, so here we want to shrink the mempool. // When we know the elements are not fixed-length, we still know how much memory they occupy in total, modulo alignment, so we want to allow for mempool overhead and worst-case alignment overhead, and not shrink the mempool. const size_t max_allowed = used + (ALIGNMENT-1) * this->temp.size(); const size_t max_allowed_with_mempool_overhead = max_allowed + max_allowed / 4; //TODO(leif): get footprint calculation correct (under jemalloc) and add some form of footprint constraint if (allocated > max_allowed_with_mempool_overhead) { // Reallocate smaller mempool to save memory invariant_zero(toku_mempool_get_frag_size(&this->temp.mp)); struct mempool new_mp; toku_mempool_construct(&new_mp, used); void * newbase = toku_mempool_malloc(&new_mp, used); invariant_notnull(newbase); memcpy(newbase, toku_mempool_get_base(&this->temp.mp), used); toku_mempool_destroy(&this->temp.mp); this->temp.mp = new_mp; } *dest = this->temp; this->temp_valid = false; } } // namespace toku