diff options
Diffstat (limited to 'storage/tokudb/ft-index/ft/cachetable.cc')
-rw-r--r-- | storage/tokudb/ft-index/ft/cachetable.cc | 262 |
1 files changed, 171 insertions, 91 deletions
diff --git a/storage/tokudb/ft-index/ft/cachetable.cc b/storage/tokudb/ft-index/ft/cachetable.cc index 4bfe8d90379..fb427c5349b 100644 --- a/storage/tokudb/ft-index/ft/cachetable.cc +++ b/storage/tokudb/ft-index/ft/cachetable.cc @@ -106,6 +106,7 @@ PATENT RIGHTS GRANT: #include <portability/toku_time.h> #include <util/rwlock.h> #include <util/status.h> +#include <util/context.h> /////////////////////////////////////////////////////////////////////////////////// // Engine status @@ -291,7 +292,10 @@ uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) { // reserve 25% as "unreservable". The loader cannot have it. #define unreservable_memory(size) ((size)/4) -void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) { +int toku_cachetable_create(CACHETABLE *ct_result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) { + int result = 0; + int r; + if (size_limit == 0) { size_limit = 128*1024*1024; } @@ -301,16 +305,46 @@ void toku_cachetable_create(CACHETABLE *result, long size_limit, LSN UU(initial_ ct->cf_list.init(); int num_processors = toku_os_get_number_active_processors(); - ct->client_kibbutz = toku_kibbutz_create(num_processors); - ct->ct_kibbutz = toku_kibbutz_create(2*num_processors); int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1; - ct->checkpointing_kibbutz = toku_kibbutz_create(checkpointing_nworkers); + r = toku_kibbutz_create(num_processors, &ct->client_kibbutz); + if (r != 0) { + result = r; + goto cleanup; + } + r = toku_kibbutz_create(2*num_processors, &ct->ct_kibbutz); + if (r != 0) { + result = r; + goto cleanup; + } + r = toku_kibbutz_create(checkpointing_nworkers, &ct->checkpointing_kibbutz); + if (r != 0) { + result = r; + goto cleanup; + } // must be done after creating ct_kibbutz - ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD); - ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list); - ct->cl.init(1, &ct->list, ct); // by default, start with one iteration + r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD); + if (r != 0) { + result = r; + goto cleanup; + } + r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list); + if (r != 0) { + result = r; + goto cleanup; + } + r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration + if (r != 0) { + result = r; + goto cleanup; + } ct->env_dir = toku_xstrdup("."); - *result = ct; +cleanup: + if (result == 0) { + *ct_result = ct; + } else { + toku_cachetable_close(&ct); + } + return result; } // Returns a pointer to the checkpoint contained within @@ -618,39 +652,6 @@ static void cachetable_free_pair(PAIR p) { ctpair_destroy(p); } -// Maybe remove a pair from the cachetable and free it, depending on whether -// or not there are any threads interested in the pair. The flush callback -// is called with write_me and keep_me both false, and the pair is destroyed. -// The sole purpose of this function is to remove the node, so the write_me -// argument to the flush callback is false, and the flush callback won't do -// anything except destroy the node. -// -// on input, pair_list's write lock is held and PAIR's mutex is held -// on exit, only the pair_list's write lock is still held -// -static void cachetable_maybe_remove_and_free_pair ( - pair_list* pl, - evictor* ev, - PAIR p - ) -{ - // this ensures that a clone running in the background first completes - if (p->value_rwlock.users() == 0 && p->refcount == 0) { - // assumption is that if we are about to remove the pair - // that no one has grabbed the disk_nb_mutex, - // and that there is no cloned_value_data, because - // no one is writing a cloned value out. - assert(nb_mutex_users(&p->disk_nb_mutex) == 0); - assert(p->cloned_value_data == NULL); - cachetable_remove_pair(pl, ev, p); - pair_unlock(p); - cachetable_free_pair(p); - } - else { - pair_unlock(p); - } -} - // assumes value_rwlock and disk_nb_mutex held on entry // responsibility of this function is to only write a locked PAIR to disk // and NOTHING else. We do not manipulate the state of the PAIR @@ -774,7 +775,7 @@ static void cachetable_evicter(void* extra) { static void cachetable_partial_eviction(void* extra) { PAIR p = (PAIR)extra; CACHEFILE cf = p->cachefile; - p->ev->do_partial_eviction(p, false); + p->ev->do_partial_eviction(p); bjm_remove_background_job(cf->bjm); } @@ -1483,6 +1484,8 @@ static bool try_pin_pair( bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); if (partial_fetch_required) { + toku::context pf_ctx(CTX_PARTIAL_FETCH); + if (ct->ev.should_client_thread_sleep() && !already_slept) { pair_lock(p); unpin_pair(p, (lock_type == PL_READ)); @@ -1634,6 +1637,8 @@ beginning: } } else { + toku::context fetch_ctx(CTX_FULL_FETCH); + ct->list.pair_unlock_by_fullhash(fullhash); // we only want to sleep once per call to get_and_pin. If we have already // slept and there is still cache pressure, then we might as @@ -2038,10 +2043,7 @@ maybe_pin_pair( if (retval == TOKUDB_TRY_AGAIN) { unpin_pair(p, (lock_type == PL_READ)); } - else { - // just a sanity check - assert(retval == 0); - } + pair_touch(p); pair_unlock(p); return retval; } @@ -2071,6 +2073,8 @@ try_again: ct->list.pair_lock_by_fullhash(fullhash); PAIR p = ct->list.find_pair(cf, key, fullhash); if (p == NULL) { + toku::context fetch_ctx(CTX_FULL_FETCH); + // Not found ct->list.pair_unlock_by_fullhash(fullhash); ct->list.write_list_lock(); @@ -2146,6 +2150,8 @@ try_again: // still check for partial fetch bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs); if (partial_fetch_required) { + toku::context fetch_ctx(CTX_PARTIAL_FETCH); + run_unlockers(unlockers); // we are now getting an expensive write lock, because we @@ -2428,10 +2434,10 @@ static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) { assert(p->dirty == CACHETABLE_CLEAN); assert(p->refcount == 0); if (completely) { - // TODO: maybe break up this function - // so that write lock does not need to be held for entire - // free - cachetable_maybe_remove_and_free_pair(&ct->list, &ct->ev, p); + cachetable_remove_pair(&ct->list, &ct->ev, p); + pair_unlock(p); + // TODO: Eventually, we should not hold the write list lock during free + cachetable_free_pair(p); } else { // if we are not evicting completely, @@ -2587,9 +2593,12 @@ void toku_cachetable_close (CACHETABLE *ctp) { ct->list.destroy(); ct->cf_list.destroy(); - toku_kibbutz_destroy(ct->client_kibbutz); - toku_kibbutz_destroy(ct->ct_kibbutz); - toku_kibbutz_destroy(ct->checkpointing_kibbutz); + if (ct->client_kibbutz) + toku_kibbutz_destroy(ct->client_kibbutz); + if (ct->ct_kibbutz) + toku_kibbutz_destroy(ct->ct_kibbutz); + if (ct->checkpointing_kibbutz) + toku_kibbutz_destroy(ct->checkpointing_kibbutz); toku_free(ct->env_dir); toku_free(ct); *ctp = 0; @@ -3074,20 +3083,29 @@ int toku_cleaner_thread (void *cleaner_v) { // ENSURE_POD(cleaner); -void cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) { +int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) { // default is no cleaner, for now - toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this); + m_cleaner_cron_init = false; + int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this); + if (r == 0) { + m_cleaner_cron_init = true; + } TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations); m_cleaner_iterations = _cleaner_iterations; m_pl = _pl; m_ct = _ct; + m_cleaner_init = true; + return r; } // this function is allowed to be called multiple times void cleaner::destroy(void) { - if (!toku_minicron_has_been_shutdown(&m_cleaner_cron)) { + if (!m_cleaner_init) { + return; + } + if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) { // for test code only, production code uses toku_cachetable_minicron_shutdown() - int r = toku_minicron_shutdown(&m_cleaner_cron); + int r = toku_minicron_shutdown(&m_cleaner_cron); assert(r==0); } } @@ -3122,6 +3140,8 @@ void cleaner::set_period(uint32_t new_period) { // start). At this point, we can safely unlock the cachetable, do the // work (callback), and unlock/release our claim to the cachefile. int cleaner::run_cleaner(void) { + toku::context cleaner_ctx(CTX_CLEANER); + int r; uint32_t num_iterations = this->get_iterations(); for (uint32_t i = 0; i < num_iterations; ++i) { @@ -3662,7 +3682,7 @@ static void *eviction_thread(void *evictor_v) { // Starts the eviction thread, assigns external object references, // and initializes all counters and condition variables. // -void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) { +int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) { TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running); TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting); @@ -3716,8 +3736,13 @@ void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, K // start the background thread m_run_thread = true; m_num_eviction_thread_runs = 0; + m_ev_thread_init = false; r = toku_pthread_create(&m_ev_thread, NULL, eviction_thread, this); - assert_zero(r); + if (r == 0) { + m_ev_thread_init = true; + } + m_evictor_init = true; + return r; } // @@ -3725,7 +3750,10 @@ void evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, K // // NOTE: This should only be called if there are no evictions in progress. // -void evictor::destroy() { +void evictor::destroy() { + if (!m_evictor_init) { + return; + } assert(m_size_evicting == 0); // // commented out of Ming, because we could not finish @@ -3734,16 +3762,16 @@ void evictor::destroy() { //assert(m_size_current == 0); // Stop the eviction thread. - toku_mutex_lock(&m_ev_thread_lock); - m_run_thread = false; - this->signal_eviction_thread(); - toku_mutex_unlock(&m_ev_thread_lock); - - void *ret; - int r = toku_pthread_join(m_ev_thread, &ret); - assert_zero(r); - assert(!m_ev_thread_is_running); - + if (m_ev_thread_init) { + toku_mutex_lock(&m_ev_thread_lock); + m_run_thread = false; + this->signal_eviction_thread(); + toku_mutex_unlock(&m_ev_thread_lock); + void *ret; + int r = toku_pthread_join(m_ev_thread, &ret); + assert_zero(r); + assert(!m_ev_thread_is_running); + } destroy_partitioned_counter(m_size_nonleaf); m_size_nonleaf = NULL; destroy_partitioned_counter(m_size_leaf); @@ -4007,6 +4035,8 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { m_pl->read_list_unlock(); ret_val = true; if (curr_in_clock->count > 0) { + toku::context pe_ctx(CTX_PARTIAL_EVICTION); + uint32_t curr_size = curr_in_clock->attr.size; // if the size of this PAIR is greater than the average size of PAIRs // in the cachetable, then decrement it, otherwise, decrement @@ -4052,10 +4082,10 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { write_extraargs ); if (cost == PE_CHEAP) { + pair_unlock(curr_in_clock); curr_in_clock->size_evicting_estimate = 0; - this->do_partial_eviction(curr_in_clock, true); + this->do_partial_eviction(curr_in_clock); bjm_remove_background_job(cf->bjm); - pair_unlock(curr_in_clock); } else if (cost == PE_EXPENSIVE) { // only bother running an expensive partial eviction @@ -4083,6 +4113,8 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) { } } else { + toku::context pe_ctx(CTX_FULL_EVICTION); + // responsibility of try_evict_pair to eventually remove background job // pair's mutex is still grabbed here this->try_evict_pair(curr_in_clock); @@ -4094,26 +4126,48 @@ exit: return ret_val; } +struct pair_unpin_with_new_attr_extra { + pair_unpin_with_new_attr_extra(evictor *e, PAIR p) : + ev(e), pair(p) { + } + evictor *ev; + PAIR pair; +}; + +static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) { + struct pair_unpin_with_new_attr_extra *info = + reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra); + PAIR p = info->pair; + evictor *ev = info->ev; + + // change the attr in the evictor, then update the value in the pair + ev->change_pair_attr(p->attr, new_attr); + p->attr = new_attr; + + // unpin + pair_lock(p); + p->value_rwlock.write_unlock(); + pair_unlock(p); +} + // -// on entry and exit, pair's mutex is held if pair_mutex_held is true +// on entry and exit, pair's mutex is not held // on exit, PAIR is unpinned // -void evictor::do_partial_eviction(PAIR p, bool pair_mutex_held) { - PAIR_ATTR new_attr; +void evictor::do_partial_eviction(PAIR p) { + // Copy the old attr PAIR_ATTR old_attr = p->attr; - - p->pe_callback(p->value_data, old_attr, &new_attr, p->write_extraargs); + long long size_evicting_estimate = p->size_evicting_estimate; - this->change_pair_attr(old_attr, new_attr); - p->attr = new_attr; - this->decrease_size_evicting(p->size_evicting_estimate); - if (!pair_mutex_held) { - pair_lock(p); - } - p->value_rwlock.write_unlock(); - if (!pair_mutex_held) { - pair_unlock(p); - } + struct pair_unpin_with_new_attr_extra extra(this, p); + p->pe_callback(p->value_data, old_attr, p->write_extraargs, + // passed as the finalize continuation, which allows the + // pe_callback to unpin the node before doing expensive cleanup + pair_unpin_with_new_attr, &extra); + + // now that the pe_callback (and its pair_unpin_with_new_attr continuation) + // have finished, we can safely decrease size_evicting + this->decrease_size_evicting(size_evicting_estimate); } // @@ -4188,8 +4242,25 @@ void evictor::evict_pair(PAIR p, bool for_checkpoint) { nb_mutex_unlock(&p->disk_nb_mutex); // at this point, we have the pair list's write list lock // and we have the pair's mutex (p->mutex) held - cachetable_maybe_remove_and_free_pair(m_pl, this, p); + + // this ensures that a clone running in the background first completes + bool removed = false; + if (p->value_rwlock.users() == 0 && p->refcount == 0) { + // assumption is that if we are about to remove the pair + // that no one has grabbed the disk_nb_mutex, + // and that there is no cloned_value_data, because + // no one is writing a cloned value out. + assert(nb_mutex_users(&p->disk_nb_mutex) == 0); + assert(p->cloned_value_data == NULL); + cachetable_remove_pair(m_pl, this, p); + removed = true; + } + pair_unlock(p); m_pl->write_list_unlock(); + // do not want to hold the write list lock while freeing a pair + if (removed) { + cachetable_free_pair(p); + } } // @@ -4348,7 +4419,7 @@ ENSURE_POD(checkpointer); // // Sets the cachetable reference in this checkpointer class, this is temporary. // -void checkpointer::init(pair_list *_pl, +int checkpointer::init(pair_list *_pl, TOKULOGGER _logger, evictor *_ev, cachefile_list *files) { @@ -4359,11 +4430,20 @@ void checkpointer::init(pair_list *_pl, bjm_init(&m_checkpoint_clones_bjm); // Default is no checkpointing. - toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this); + m_checkpointer_cron_init = false; + int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this); + if (r == 0) { + m_checkpointer_cron_init = true; + } + m_checkpointer_init = true; + return r; } void checkpointer::destroy() { - if (!this->has_been_shutdown()) { + if (!m_checkpointer_init) { + return; + } + if (m_checkpointer_cron_init && !this->has_been_shutdown()) { // for test code only, production code uses toku_cachetable_minicron_shutdown() int r = this->shutdown(); assert(r == 0); |