diff options
Diffstat (limited to 'storage/tokudb/ft-index/src/indexer.cc')
-rw-r--r-- | storage/tokudb/ft-index/src/indexer.cc | 95 |
1 files changed, 83 insertions, 12 deletions
diff --git a/storage/tokudb/ft-index/src/indexer.cc b/storage/tokudb/ft-index/src/indexer.cc index c79169fa1da..d84f4f4ba8f 100644 --- a/storage/tokudb/ft-index/src/indexer.cc +++ b/storage/tokudb/ft-index/src/indexer.cc @@ -49,6 +49,7 @@ UNIVERSITY PATENT NOTICE: PATENT MARKING NOTICE: This software is covered by US Patent No. 8,185,551. + This software is covered by US Patent No. 8,489,638. PATENT RIGHTS GRANT: @@ -189,6 +190,8 @@ static void free_indexer_resources(DB_INDEXER *indexer) { if ( indexer->i ) { toku_mutex_destroy(&indexer->i->indexer_lock); + toku_mutex_destroy(&indexer->i->indexer_estimate_lock); + toku_destroy_dbt(&indexer->i->position_estimate); if ( indexer->i->lec ) { toku_le_cursor_close(indexer->i->lec); } @@ -222,8 +225,51 @@ toku_indexer_unlock(DB_INDEXER* indexer) { toku_mutex_unlock(&indexer->i->indexer_lock); } +// a shortcut call +// +// a cheap(er) call to see if a key must be inserted +// into the DB. If true, then we know we have to insert. +// If false, then we don't know, and have to check again +// after grabbing the indexer lock +bool +toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) { + bool retval = false; + toku_mutex_lock(&indexer->i->indexer_estimate_lock); + // if we have no position estimate, we can't tell, so return false + if (indexer->i->position_estimate.data == NULL) { + retval = false; + } + else { + FT_HANDLE ft_handle = indexer->i->src_db->i->ft_handle; + ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle); + int r = keycompare( + indexer->i->src_db, + &indexer->i->position_estimate, + key + ); + // if key > position_estimate, then we know the indexer cursor + // is past key, and we can safely say that associated values of + // key must be inserted into the indexer's db + if (r < 0) { + retval = true; + } + else { + retval = false; + } + } + toku_mutex_unlock(&indexer->i->indexer_estimate_lock); + return retval; +} + +void +toku_indexer_update_estimate(DB_INDEXER* indexer) { + toku_mutex_lock(&indexer->i->indexer_estimate_lock); + toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate); + toku_mutex_unlock(&indexer->i->indexer_estimate_lock); +} + // forward declare the test-only wrapper function for undo-do -static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule); +static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule); int toku_indexer_create_indexer(DB_ENV *env, @@ -272,6 +318,8 @@ toku_indexer_create_indexer(DB_ENV *env, indexer->abort = abort_indexer; toku_mutex_init(&indexer->i->indexer_lock, NULL); + toku_mutex_init(&indexer->i->indexer_estimate_lock, NULL); + toku_init_dbt(&indexer->i->position_estimate); // // create and close a dummy loader to get redirection going for the hot indexer @@ -356,17 +404,26 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer, // a key is to the right of the indexer's cursor if it compares // greater than the current le cursor position. bool -toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) { - return toku_le_cursor_is_key_greater(indexer->i->lec, key); +toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) { + // the hot indexer runs from the end to the beginning, it gets the largest keys first + // + // if key is less than indexer's position, then we should NOT insert it because + // the indexer will get to it. If it is greater or equal, that means the indexer + // has already processed the key, and will not get to it, therefore, we need + // to handle it + return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key); } // initialize provisional info by allocating enough space to hold provisional // ids, states, and txns for each of the provisional entries in the ule. the // ule and le remain owned by the caller, not this struct. static void -ule_prov_info_init(struct ule_prov_info *prov_info, LEAFENTRY le, ULEHANDLE ule) { +ule_prov_info_init(struct ule_prov_info *prov_info, const void* key, uint32_t keylen, LEAFENTRY le, ULEHANDLE ule) { prov_info->le = le; prov_info->ule = ule; + prov_info->keylen = keylen; + prov_info->key = toku_xmalloc(keylen); + memcpy(prov_info->key, key, keylen); prov_info->num_provisional = ule_get_num_provisional(ule); prov_info->num_committed = ule_get_num_committed(ule); uint32_t n = prov_info->num_provisional; @@ -489,7 +546,7 @@ struct le_cursor_extra { // cachetable pair locks. because no txn can commit on this db, read // the provisional info for the newly read ule. static int -le_cursor_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), bytevec val, void *extra, bool lock_only) { +le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN UU(vallen), bytevec val, void *extra, bool lock_only) { if (lock_only || val == NULL) { ; // do nothing if only locking. do nothing if val==NULL, means DB_NOTFOUND } else { @@ -504,7 +561,7 @@ le_cursor_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), byte // when we initialize prov info, we also pass in the leafentry and ule // pointers so the caller can access them later. it's their job to free // them when they're not needed. - ule_prov_info_init(prov_info, le, ule); + ule_prov_info_init(prov_info, key, keylen, le, ule); indexer_fill_prov_info(cursor_extra->indexer, prov_info); } return 0; @@ -558,14 +615,15 @@ build_index(DB_INDEXER *indexer) { else { invariant(prov_info.le); invariant(prov_info.ule); - ULEHANDLE ule = prov_info.ule; for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) { DB *db = indexer->i->dest_dbs[which_db]; - result = indexer_undo_do(indexer, db, ule, &prov_info); + DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db]; + DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db]; + result = indexer_undo_do(indexer, db, &prov_info, hot_keys, hot_vals); if ((result != 0) && (indexer->i->error_callback != NULL)) { // grab the key and call the error callback DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC); - toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL); + toku_dbt_set(prov_info.keylen, prov_info.key, &key, NULL); indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra); toku_destroy_dbt(&key); } @@ -573,6 +631,7 @@ build_index(DB_INDEXER *indexer) { // the leafentry and ule are not owned by the prov_info, // and are still our responsibility to free toku_free(prov_info.le); + toku_free(prov_info.key); toku_ule_free(prov_info.ule); } @@ -709,13 +768,25 @@ toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) { // this allows us to call the undo do function in tests using // a convenience wrapper that gets and destroys the ule's prov info static int -test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { +test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule) { + int which_db; + for (which_db = 0; which_db < indexer->i->N; which_db++) { + if (indexer->i->dest_dbs[which_db] == hotdb) { + break; + } + } + if (which_db == indexer->i->N) { + return EINVAL; + } struct ule_prov_info prov_info; memset(&prov_info, 0, sizeof(prov_info)); // pass null for the leafentry - we don't need it, neither does the info - ule_prov_info_init(&prov_info, NULL, ule); + ule_prov_info_init(&prov_info, key->data, key->size, NULL, ule); // mallocs prov_info->key, owned by this function indexer_fill_prov_info(indexer, &prov_info); - int r = indexer_undo_do(indexer, hotdb, ule, &prov_info); + DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db]; + DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db]; + int r = indexer_undo_do(indexer, hotdb, &prov_info, hot_keys, hot_vals); + toku_free(prov_info.key); ule_prov_info_destroy(&prov_info); return r; } |