summaryrefslogtreecommitdiff
path: root/storage/tokudb/ft-index/src/indexer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/ft-index/src/indexer.cc')
-rw-r--r--storage/tokudb/ft-index/src/indexer.cc95
1 files changed, 83 insertions, 12 deletions
diff --git a/storage/tokudb/ft-index/src/indexer.cc b/storage/tokudb/ft-index/src/indexer.cc
index c79169fa1da..d84f4f4ba8f 100644
--- a/storage/tokudb/ft-index/src/indexer.cc
+++ b/storage/tokudb/ft-index/src/indexer.cc
@@ -49,6 +49,7 @@ UNIVERSITY PATENT NOTICE:
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
+ This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
@@ -189,6 +190,8 @@ static void
free_indexer_resources(DB_INDEXER *indexer) {
if ( indexer->i ) {
toku_mutex_destroy(&indexer->i->indexer_lock);
+ toku_mutex_destroy(&indexer->i->indexer_estimate_lock);
+ toku_destroy_dbt(&indexer->i->position_estimate);
if ( indexer->i->lec ) {
toku_le_cursor_close(indexer->i->lec);
}
@@ -222,8 +225,51 @@ toku_indexer_unlock(DB_INDEXER* indexer) {
toku_mutex_unlock(&indexer->i->indexer_lock);
}
+// a shortcut call
+//
+// a cheap(er) call to see if a key must be inserted
+// into the DB. If true, then we know we have to insert.
+// If false, then we don't know, and have to check again
+// after grabbing the indexer lock
+bool
+toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) {
+ bool retval = false;
+ toku_mutex_lock(&indexer->i->indexer_estimate_lock);
+ // if we have no position estimate, we can't tell, so return false
+ if (indexer->i->position_estimate.data == NULL) {
+ retval = false;
+ }
+ else {
+ FT_HANDLE ft_handle = indexer->i->src_db->i->ft_handle;
+ ft_compare_func keycompare = toku_ft_get_bt_compare(ft_handle);
+ int r = keycompare(
+ indexer->i->src_db,
+ &indexer->i->position_estimate,
+ key
+ );
+ // if key > position_estimate, then we know the indexer cursor
+ // is past key, and we can safely say that associated values of
+ // key must be inserted into the indexer's db
+ if (r < 0) {
+ retval = true;
+ }
+ else {
+ retval = false;
+ }
+ }
+ toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
+ return retval;
+}
+
+void
+toku_indexer_update_estimate(DB_INDEXER* indexer) {
+ toku_mutex_lock(&indexer->i->indexer_estimate_lock);
+ toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate);
+ toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
+}
+
// forward declare the test-only wrapper function for undo-do
-static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
+static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule);
int
toku_indexer_create_indexer(DB_ENV *env,
@@ -272,6 +318,8 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->abort = abort_indexer;
toku_mutex_init(&indexer->i->indexer_lock, NULL);
+ toku_mutex_init(&indexer->i->indexer_estimate_lock, NULL);
+ toku_init_dbt(&indexer->i->position_estimate);
//
// create and close a dummy loader to get redirection going for the hot indexer
@@ -356,17 +404,26 @@ toku_indexer_set_error_callback(DB_INDEXER *indexer,
// a key is to the right of the indexer's cursor if it compares
// greater than the current le cursor position.
bool
-toku_indexer_is_key_right_of_le_cursor(DB_INDEXER *indexer, const DBT *key) {
- return toku_le_cursor_is_key_greater(indexer->i->lec, key);
+toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) {
+ // the hot indexer runs from the end to the beginning, it gets the largest keys first
+ //
+ // if key is less than indexer's position, then we should NOT insert it because
+ // the indexer will get to it. If it is greater or equal, that means the indexer
+ // has already processed the key, and will not get to it, therefore, we need
+ // to handle it
+ return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key);
}
// initialize provisional info by allocating enough space to hold provisional
// ids, states, and txns for each of the provisional entries in the ule. the
// ule and le remain owned by the caller, not this struct.
static void
-ule_prov_info_init(struct ule_prov_info *prov_info, LEAFENTRY le, ULEHANDLE ule) {
+ule_prov_info_init(struct ule_prov_info *prov_info, const void* key, uint32_t keylen, LEAFENTRY le, ULEHANDLE ule) {
prov_info->le = le;
prov_info->ule = ule;
+ prov_info->keylen = keylen;
+ prov_info->key = toku_xmalloc(keylen);
+ memcpy(prov_info->key, key, keylen);
prov_info->num_provisional = ule_get_num_provisional(ule);
prov_info->num_committed = ule_get_num_committed(ule);
uint32_t n = prov_info->num_provisional;
@@ -489,7 +546,7 @@ struct le_cursor_extra {
// cachetable pair locks. because no txn can commit on this db, read
// the provisional info for the newly read ule.
static int
-le_cursor_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), bytevec val, void *extra, bool lock_only) {
+le_cursor_callback(ITEMLEN keylen, bytevec key, ITEMLEN UU(vallen), bytevec val, void *extra, bool lock_only) {
if (lock_only || val == NULL) {
; // do nothing if only locking. do nothing if val==NULL, means DB_NOTFOUND
} else {
@@ -504,7 +561,7 @@ le_cursor_callback(ITEMLEN UU(keylen), bytevec UU(key), ITEMLEN UU(vallen), byte
// when we initialize prov info, we also pass in the leafentry and ule
// pointers so the caller can access them later. it's their job to free
// them when they're not needed.
- ule_prov_info_init(prov_info, le, ule);
+ ule_prov_info_init(prov_info, key, keylen, le, ule);
indexer_fill_prov_info(cursor_extra->indexer, prov_info);
}
return 0;
@@ -558,14 +615,15 @@ build_index(DB_INDEXER *indexer) {
else {
invariant(prov_info.le);
invariant(prov_info.ule);
- ULEHANDLE ule = prov_info.ule;
for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
DB *db = indexer->i->dest_dbs[which_db];
- result = indexer_undo_do(indexer, db, ule, &prov_info);
+ DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
+ DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
+ result = indexer_undo_do(indexer, db, &prov_info, hot_keys, hot_vals);
if ((result != 0) && (indexer->i->error_callback != NULL)) {
// grab the key and call the error callback
DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
- toku_dbt_set(ule_get_keylen(ule), ule_get_key(ule), &key, NULL);
+ toku_dbt_set(prov_info.keylen, prov_info.key, &key, NULL);
indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
toku_destroy_dbt(&key);
}
@@ -573,6 +631,7 @@ build_index(DB_INDEXER *indexer) {
// the leafentry and ule are not owned by the prov_info,
// and are still our responsibility to free
toku_free(prov_info.le);
+ toku_free(prov_info.key);
toku_ule_free(prov_info.ule);
}
@@ -709,13 +768,25 @@ toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) {
// this allows us to call the undo do function in tests using
// a convenience wrapper that gets and destroys the ule's prov info
static int
-test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
+test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule) {
+ int which_db;
+ for (which_db = 0; which_db < indexer->i->N; which_db++) {
+ if (indexer->i->dest_dbs[which_db] == hotdb) {
+ break;
+ }
+ }
+ if (which_db == indexer->i->N) {
+ return EINVAL;
+ }
struct ule_prov_info prov_info;
memset(&prov_info, 0, sizeof(prov_info));
// pass null for the leafentry - we don't need it, neither does the info
- ule_prov_info_init(&prov_info, NULL, ule);
+ ule_prov_info_init(&prov_info, key->data, key->size, NULL, ule); // mallocs prov_info->key, owned by this function
indexer_fill_prov_info(indexer, &prov_info);
- int r = indexer_undo_do(indexer, hotdb, ule, &prov_info);
+ DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
+ DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
+ int r = indexer_undo_do(indexer, hotdb, &prov_info, hot_keys, hot_vals);
+ toku_free(prov_info.key);
ule_prov_info_destroy(&prov_info);
return r;
}