summaryrefslogtreecommitdiff
path: root/storage/tokudb/ft-index/ft/cachetable.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/ft-index/ft/cachetable.cc')
-rw-r--r--storage/tokudb/ft-index/ft/cachetable.cc262
1 files changed, 171 insertions, 91 deletions
diff --git a/storage/tokudb/ft-index/ft/cachetable.cc b/storage/tokudb/ft-index/ft/cachetable.cc
index 4bfe8d90379..fb427c5349b 100644
--- a/storage/tokudb/ft-index/ft/cachetable.cc
+++ b/storage/tokudb/ft-index/ft/cachetable.cc
@@ -106,6 +106,7 @@ PATENT RIGHTS GRANT:
#include <portability/toku_time.h>
#include <util/rwlock.h>
#include <util/status.h>
+#include <util/context.h>
///////////////////////////////////////////////////////////////////////////////////
// Engine status
@@ -291,7 +292,10 @@ uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) {
// reserve 25% as "unreservable". The loader cannot have it.
#define unreservable_memory(size) ((size)/4)
-void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
+int toku_cachetable_create(CACHETABLE *ct_result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
+ int result = 0;
+ int r;
+
if (size_limit == 0) {
size_limit = 128*1024*1024;
}
@@ -301,16 +305,46 @@ void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN UU(initial_
ct->cf_list.init();
int num_processors = toku_os_get_number_active_processors();
- ct->client_kibbutz = toku_kibbutz_create(num_processors);
- ct->ct_kibbutz = toku_kibbutz_create(2*num_processors);
int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
- ct->checkpointing_kibbutz = toku_kibbutz_create(checkpointing_nworkers);
+ r = toku_kibbutz_create(num_processors, &ct->client_kibbutz);
+ if (r != 0) {
+ result = r;
+ goto cleanup;
+ }
+ r = toku_kibbutz_create(2*num_processors, &ct->ct_kibbutz);
+ if (r != 0) {
+ result = r;
+ goto cleanup;
+ }
+ r = toku_kibbutz_create(checkpointing_nworkers, &ct->checkpointing_kibbutz);
+ if (r != 0) {
+ result = r;
+ goto cleanup;
+ }
// must be done after creating ct_kibbutz
- ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
- ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
- ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
+ r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
+ if (r != 0) {
+ result = r;
+ goto cleanup;
+ }
+ r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
+ if (r != 0) {
+ result = r;
+ goto cleanup;
+ }
+ r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
+ if (r != 0) {
+ result = r;
+ goto cleanup;
+ }
ct->env_dir = toku_xstrdup(".");
- *result = ct;
+cleanup:
+ if (result == 0) {
+ *ct_result = ct;
+ } else {
+ toku_cachetable_close(&ct);
+ }
+ return result;
}
// Returns a pointer to the checkpoint contained within
@@ -618,39 +652,6 @@ static void cachetable_free_pair(PAIR p) {
ctpair_destroy(p);
}
-// Maybe remove a pair from the cachetable and free it, depending on whether
-// or not there are any threads interested in the pair. The flush callback
-// is called with write_me and keep_me both false, and the pair is destroyed.
-// The sole purpose of this function is to remove the node, so the write_me
-// argument to the flush callback is false, and the flush callback won't do
-// anything except destroy the node.
-//
-// on input, pair_list's write lock is held and PAIR's mutex is held
-// on exit, only the pair_list's write lock is still held
-//
-static void cachetable_maybe_remove_and_free_pair (
- pair_list* pl,
- evictor* ev,
- PAIR p
- )
-{
- // this ensures that a clone running in the background first completes
- if (p->value_rwlock.users() == 0 && p->refcount == 0) {
- // assumption is that if we are about to remove the pair
- // that no one has grabbed the disk_nb_mutex,
- // and that there is no cloned_value_data, because
- // no one is writing a cloned value out.
- assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
- assert(p->cloned_value_data == NULL);
- cachetable_remove_pair(pl, ev, p);
- pair_unlock(p);
- cachetable_free_pair(p);
- }
- else {
- pair_unlock(p);
- }
-}
-
// assumes value_rwlock and disk_nb_mutex held on entry
// responsibility of this function is to only write a locked PAIR to disk
// and NOTHING else. We do not manipulate the state of the PAIR
@@ -774,7 +775,7 @@ static void cachetable_evicter(void* extra) {
static void cachetable_partial_eviction(void* extra) {
PAIR p = (PAIR)extra;
CACHEFILE cf = p->cachefile;
- p->ev->do_partial_eviction(p, false);
+ p->ev->do_partial_eviction(p);
bjm_remove_background_job(cf->bjm);
}
@@ -1483,6 +1484,8 @@ static bool try_pin_pair(
bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
if (partial_fetch_required) {
+ toku::context pf_ctx(CTX_PARTIAL_FETCH);
+
if (ct->ev.should_client_thread_sleep() && !already_slept) {
pair_lock(p);
unpin_pair(p, (lock_type == PL_READ));
@@ -1634,6 +1637,8 @@ beginning:
}
}
else {
+ toku::context fetch_ctx(CTX_FULL_FETCH);
+
ct->list.pair_unlock_by_fullhash(fullhash);
// we only want to sleep once per call to get_and_pin. If we have already
// slept and there is still cache pressure, then we might as
@@ -2038,10 +2043,7 @@ maybe_pin_pair(
if (retval == TOKUDB_TRY_AGAIN) {
unpin_pair(p, (lock_type == PL_READ));
}
- else {
- // just a sanity check
- assert(retval == 0);
- }
+ pair_touch(p);
pair_unlock(p);
return retval;
}
@@ -2071,6 +2073,8 @@ try_again:
ct->list.pair_lock_by_fullhash(fullhash);
PAIR p = ct->list.find_pair(cf, key, fullhash);
if (p == NULL) {
+ toku::context fetch_ctx(CTX_FULL_FETCH);
+
// Not found
ct->list.pair_unlock_by_fullhash(fullhash);
ct->list.write_list_lock();
@@ -2146,6 +2150,8 @@ try_again:
// still check for partial fetch
bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
if (partial_fetch_required) {
+ toku::context fetch_ctx(CTX_PARTIAL_FETCH);
+
run_unlockers(unlockers);
// we are now getting an expensive write lock, because we
@@ -2428,10 +2434,10 @@ static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) {
assert(p->dirty == CACHETABLE_CLEAN);
assert(p->refcount == 0);
if (completely) {
- // TODO: maybe break up this function
- // so that write lock does not need to be held for entire
- // free
- cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p);
+ cachetable_remove_pair(&ct->list, &ct->ev, p);
+ pair_unlock(p);
+ // TODO: Eventually, we should not hold the write list lock during free
+ cachetable_free_pair(p);
}
else {
// if we are not evicting completely,
@@ -2587,9 +2593,12 @@ void toku_cachetable_close (CACHETABLE *ctp) {
ct->list.destroy();
ct->cf_list.destroy();
- toku_kibbutz_destroy(ct->client_kibbutz);
- toku_kibbutz_destroy(ct->ct_kibbutz);
- toku_kibbutz_destroy(ct->checkpointing_kibbutz);
+ if (ct->client_kibbutz)
+ toku_kibbutz_destroy(ct->client_kibbutz);
+ if (ct->ct_kibbutz)
+ toku_kibbutz_destroy(ct->ct_kibbutz);
+ if (ct->checkpointing_kibbutz)
+ toku_kibbutz_destroy(ct->checkpointing_kibbutz);
toku_free(ct->env_dir);
toku_free(ct);
*ctp = 0;
@@ -3074,20 +3083,29 @@ int toku_cleaner_thread (void *cleaner_v) {
//
ENSURE_POD(cleaner);
-void cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
+int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
// default is no cleaner, for now
- toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
+ m_cleaner_cron_init = false;
+ int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
+ if (r == 0) {
+ m_cleaner_cron_init = true;
+ }
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations);
m_cleaner_iterations = _cleaner_iterations;
m_pl = _pl;
m_ct = _ct;
+ m_cleaner_init = true;
+ return r;
}
// this function is allowed to be called multiple times
void cleaner::destroy(void) {
- if (!toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
+ if (!m_cleaner_init) {
+ return;
+ }
+ if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
// for test code only, production code uses toku_cachetable_minicron_shutdown()
- int r = toku_minicron_shutdown(&m_cleaner_cron);
+ int r = toku_minicron_shutdown(&m_cleaner_cron);
assert(r==0);
}
}
@@ -3122,6 +3140,8 @@ void cleaner::set_period(uint32_t new_period) {
// start). At this point, we can safely unlock the cachetable, do the
// work (callback), and unlock/release our claim to the cachefile.
int cleaner::run_cleaner(void) {
+ toku::context cleaner_ctx(CTX_CLEANER);
+
int r;
uint32_t num_iterations = this->get_iterations();
for (uint32_t i = 0; i < num_iterations; ++i) {
@@ -3662,7 +3682,7 @@ static void *eviction_thread(void *evictor_v) {
// Starts the eviction thread, assigns external object references,
// and initializes all counters and condition variables.
//
-void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
+int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running);
TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting);
@@ -3716,8 +3736,13 @@ void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, K
// start the background thread
m_run_thread = true;
m_num_eviction_thread_runs = 0;
+ m_ev_thread_init = false;
r = toku_pthread_create(&m_ev_thread, NULL, eviction_thread, this);
- assert_zero(r);
+ if (r == 0) {
+ m_ev_thread_init = true;
+ }
+ m_evictor_init = true;
+ return r;
}
//
@@ -3725,7 +3750,10 @@ void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, K
//
// NOTE: This should only be called if there are no evictions in progress.
//
-void evictor::destroy() {
+void evictor::destroy() {
+ if (!m_evictor_init) {
+ return;
+ }
assert(m_size_evicting == 0);
//
// commented out of Ming, because we could not finish
@@ -3734,16 +3762,16 @@ void evictor::destroy() {
//assert(m_size_current == 0);
// Stop the eviction thread.
- toku_mutex_lock(&m_ev_thread_lock);
- m_run_thread = false;
- this->signal_eviction_thread();
- toku_mutex_unlock(&m_ev_thread_lock);
-
- void *ret;
- int r = toku_pthread_join(m_ev_thread, &ret);
- assert_zero(r);
- assert(!m_ev_thread_is_running);
-
+ if (m_ev_thread_init) {
+ toku_mutex_lock(&m_ev_thread_lock);
+ m_run_thread = false;
+ this->signal_eviction_thread();
+ toku_mutex_unlock(&m_ev_thread_lock);
+ void *ret;
+ int r = toku_pthread_join(m_ev_thread, &ret);
+ assert_zero(r);
+ assert(!m_ev_thread_is_running);
+ }
destroy_partitioned_counter(m_size_nonleaf);
m_size_nonleaf = NULL;
destroy_partitioned_counter(m_size_leaf);
@@ -4007,6 +4035,8 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
m_pl->read_list_unlock();
ret_val = true;
if (curr_in_clock->count > 0) {
+ toku::context pe_ctx(CTX_PARTIAL_EVICTION);
+
uint32_t curr_size = curr_in_clock->attr.size;
// if the size of this PAIR is greater than the average size of PAIRs
// in the cachetable, then decrement it, otherwise, decrement
@@ -4052,10 +4082,10 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
write_extraargs
);
if (cost == PE_CHEAP) {
+ pair_unlock(curr_in_clock);
curr_in_clock->size_evicting_estimate = 0;
- this->do_partial_eviction(curr_in_clock, true);
+ this->do_partial_eviction(curr_in_clock);
bjm_remove_background_job(cf->bjm);
- pair_unlock(curr_in_clock);
}
else if (cost == PE_EXPENSIVE) {
// only bother running an expensive partial eviction
@@ -4083,6 +4113,8 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
}
}
else {
+ toku::context pe_ctx(CTX_FULL_EVICTION);
+
// responsibility of try_evict_pair to eventually remove background job
// pair's mutex is still grabbed here
this->try_evict_pair(curr_in_clock);
@@ -4094,26 +4126,48 @@ exit:
return ret_val;
}
+struct pair_unpin_with_new_attr_extra {
+ pair_unpin_with_new_attr_extra(evictor *e, PAIR p) :
+ ev(e), pair(p) {
+ }
+ evictor *ev;
+ PAIR pair;
+};
+
+static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) {
+ struct pair_unpin_with_new_attr_extra *info =
+ reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra);
+ PAIR p = info->pair;
+ evictor *ev = info->ev;
+
+ // change the attr in the evictor, then update the value in the pair
+ ev->change_pair_attr(p->attr, new_attr);
+ p->attr = new_attr;
+
+ // unpin
+ pair_lock(p);
+ p->value_rwlock.write_unlock();
+ pair_unlock(p);
+}
+
//
-// on entry and exit, pair's mutex is held if pair_mutex_held is true
+// on entry and exit, pair's mutex is not held
// on exit, PAIR is unpinned
//
-void evictor::do_partial_eviction(PAIR p, bool pair_mutex_held) {
- PAIR_ATTR new_attr;
+void evictor::do_partial_eviction(PAIR p) {
+ // Copy the old attr
PAIR_ATTR old_attr = p->attr;
-
- p->pe_callback(p->value_data, old_attr, &new_attr, p->write_extraargs);
+ long long size_evicting_estimate = p->size_evicting_estimate;
- this->change_pair_attr(old_attr, new_attr);
- p->attr = new_attr;
- this->decrease_size_evicting(p->size_evicting_estimate);
- if (!pair_mutex_held) {
- pair_lock(p);
- }
- p->value_rwlock.write_unlock();
- if (!pair_mutex_held) {
- pair_unlock(p);
- }
+ struct pair_unpin_with_new_attr_extra extra(this, p);
+ p->pe_callback(p->value_data, old_attr, p->write_extraargs,
+ // passed as the finalize continuation, which allows the
+ // pe_callback to unpin the node before doing expensive cleanup
+ pair_unpin_with_new_attr, &extra);
+
+ // now that the pe_callback (and its pair_unpin_with_new_attr continuation)
+ // have finished, we can safely decrease size_evicting
+ this->decrease_size_evicting(size_evicting_estimate);
}
//
@@ -4188,8 +4242,25 @@ void evictor::evict_pair(PAIR p, bool for_checkpoint) {
nb_mutex_unlock(&p->disk_nb_mutex);
// at this point, we have the pair list's write list lock
// and we have the pair's mutex (p->mutex) held
- cachetable_maybe_remove_and_free_pair(m_pl, this, p);
+
+ // this ensures that a clone running in the background first completes
+ bool removed = false;
+ if (p->value_rwlock.users() == 0 && p->refcount == 0) {
+ // assumption is that if we are about to remove the pair
+ // that no one has grabbed the disk_nb_mutex,
+ // and that there is no cloned_value_data, because
+ // no one is writing a cloned value out.
+ assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
+ assert(p->cloned_value_data == NULL);
+ cachetable_remove_pair(m_pl, this, p);
+ removed = true;
+ }
+ pair_unlock(p);
m_pl->write_list_unlock();
+ // do not want to hold the write list lock while freeing a pair
+ if (removed) {
+ cachetable_free_pair(p);
+ }
}
//
@@ -4348,7 +4419,7 @@ ENSURE_POD(checkpointer);
//
// Sets the cachetable reference in this checkpointer class, this is temporary.
//
-void checkpointer::init(pair_list *_pl,
+int checkpointer::init(pair_list *_pl,
TOKULOGGER _logger,
evictor *_ev,
cachefile_list *files) {
@@ -4359,11 +4430,20 @@ void checkpointer::init(pair_list *_pl,
bjm_init(&m_checkpoint_clones_bjm);
// Default is no checkpointing.
- toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
+ m_checkpointer_cron_init = false;
+ int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
+ if (r == 0) {
+ m_checkpointer_cron_init = true;
+ }
+ m_checkpointer_init = true;
+ return r;
}
void checkpointer::destroy() {
- if (!this->has_been_shutdown()) {
+ if (!m_checkpointer_init) {
+ return;
+ }
+ if (m_checkpointer_cron_init && !this->has_been_shutdown()) {
// for test code only, production code uses toku_cachetable_minicron_shutdown()
int r = this->shutdown();
assert(r == 0);