diff options
Diffstat (limited to 'db')
-rw-r--r-- | db/c.cc | 110 | ||||
-rw-r--r-- | db/c_test.c | 77 | ||||
-rw-r--r-- | db/db_bench.cc | 99 | ||||
-rw-r--r-- | db/db_impl.cc | 12 | ||||
-rw-r--r-- | db/db_impl.h | 2 | ||||
-rw-r--r-- | db/db_test.cc | 1050 | ||||
-rw-r--r-- | db/dbformat.cc | 20 | ||||
-rw-r--r-- | db/dbformat.h | 12 | ||||
-rw-r--r-- | db/repair.cc | 4 | ||||
-rw-r--r-- | db/table_cache.cc | 58 | ||||
-rw-r--r-- | db/table_cache.h | 11 | ||||
-rw-r--r-- | db/version_set.cc | 83 |
12 files changed, 1045 insertions, 493 deletions
@@ -10,6 +10,7 @@ #include "leveldb/comparator.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" #include "leveldb/iterator.h" #include "leveldb/options.h" #include "leveldb/status.h" @@ -21,8 +22,10 @@ using leveldb::CompressionType; using leveldb::DB; using leveldb::Env; using leveldb::FileLock; +using leveldb::FilterPolicy; using leveldb::Iterator; using leveldb::Logger; +using leveldb::NewBloomFilterPolicy; using leveldb::NewLRUCache; using leveldb::Options; using leveldb::RandomAccessFile; @@ -78,6 +81,47 @@ struct leveldb_comparator_t : public Comparator { virtual void FindShortSuccessor(std::string* key) const { } }; +struct leveldb_filterpolicy_t : public FilterPolicy { + void* state_; + void (*destructor_)(void*); + const char* (*name_)(void*); + char* (*create_)( + void*, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length); + unsigned char (*key_match_)( + void*, + const char* key, size_t length, + const char* filter, size_t filter_length); + + virtual ~leveldb_filterpolicy_t() { + (*destructor_)(state_); + } + + virtual const char* Name() const { + return (*name_)(state_); + } + + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const { + std::vector<const char*> key_pointers(n); + std::vector<size_t> key_sizes(n); + for (int i = 0; i < n; i++) { + key_pointers[i] = keys[i].data(); + key_sizes[i] = keys[i].size(); + } + size_t len; + char* filter = (*create_)(state_, &key_pointers[0], &key_sizes[0], n, &len); + dst->append(filter, len); + free(filter); + } + + virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const { + return (*key_match_)(state_, key.data(), key.size(), + filter.data(), filter.size()); + } +}; + struct leveldb_env_t { Env* rep; bool is_default; @@ -218,6 +262,17 @@ void leveldb_approximate_sizes( delete[] ranges; } +void leveldb_compact_range( + leveldb_t* db, + const char* start_key, size_t start_key_len, + const char* limit_key, size_t limit_key_len) { + Slice a, b; + db->rep->CompactRange( + // Pass NULL Slice if corresponding "const char*" is NULL + (start_key ? (a = Slice(start_key, start_key_len), &a) : NULL), + (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : NULL)); +} + void leveldb_destroy_db( const leveldb_options_t* options, const char* name, @@ -340,6 +395,12 @@ void leveldb_options_set_comparator( opt->rep.comparator = cmp; } +void leveldb_options_set_filter_policy( + leveldb_options_t* opt, + leveldb_filterpolicy_t* policy) { + opt->rep.filter_policy = policy; +} + void leveldb_options_set_create_if_missing( leveldb_options_t* opt, unsigned char v) { opt->rep.create_if_missing = v; @@ -407,6 +468,55 @@ void leveldb_comparator_destroy(leveldb_comparator_t* cmp) { delete cmp; } +leveldb_filterpolicy_t* leveldb_filterpolicy_create( + void* state, + void (*destructor)(void*), + char* (*create_filter)( + void*, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length), + unsigned char (*key_may_match)( + void*, + const char* key, size_t length, + const char* filter, size_t filter_length), + const char* (*name)(void*)) { + leveldb_filterpolicy_t* result = new leveldb_filterpolicy_t; + result->state_ = state; + result->destructor_ = destructor; + result->create_ = create_filter; + result->key_match_ = key_may_match; + result->name_ = name; + return result; +} + +void leveldb_filterpolicy_destroy(leveldb_filterpolicy_t* filter) { + delete filter; +} + +leveldb_filterpolicy_t* leveldb_filterpolicy_create_bloom(int bits_per_key) { + // Make a leveldb_filterpolicy_t, but override all of its methods so + // they delegate to a NewBloomFilterPolicy() instead of user + // supplied C functions. + struct Wrapper : public leveldb_filterpolicy_t { + const FilterPolicy* rep_; + ~Wrapper() { delete rep_; } + const char* Name() const { return rep_->Name(); } + void CreateFilter(const Slice* keys, int n, std::string* dst) const { + return rep_->CreateFilter(keys, n, dst); + } + bool KeyMayMatch(const Slice& key, const Slice& filter) const { + return rep_->KeyMayMatch(key, filter); + } + static void DoNothing(void*) { } + }; + Wrapper* wrapper = new Wrapper; + wrapper->rep_ = NewBloomFilterPolicy(bits_per_key); + wrapper->state_ = NULL; + wrapper->destructor_ = &Wrapper::DoNothing; + return wrapper; +} + leveldb_readoptions_t* leveldb_readoptions_create() { return new leveldb_readoptions_t; } diff --git a/db/c_test.c b/db/c_test.c index 9fef325..12b4424 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -122,6 +122,31 @@ static const char* CmpName(void* arg) { return "foo"; } +// Custom filter policy +static unsigned char fake_filter_result = 1; +static void FilterDestroy(void* arg) { } +static const char* FilterName(void* arg) { + return "TestFilter"; +} +static char* FilterCreate( + void* arg, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length) { + *filter_length = 4; + char* result = malloc(4); + memcpy(result, "fake", 4); + return result; +} +unsigned char FilterKeyMatch( + void* arg, + const char* key, size_t length, + const char* filter, size_t filter_length) { + CheckCondition(filter_length == 4); + CheckCondition(memcmp(filter, "fake", 4) == 0); + return fake_filter_result; +} + int main(int argc, char** argv) { leveldb_t* db; leveldb_comparator_t* cmp; @@ -131,6 +156,7 @@ int main(int argc, char** argv) { leveldb_readoptions_t* roptions; leveldb_writeoptions_t* woptions; char* err = NULL; + int run = -1; snprintf(dbname, sizeof(dbname), "/tmp/leveldb_c_test-%d", ((int) geteuid())); @@ -180,6 +206,14 @@ int main(int argc, char** argv) { CheckNoError(err); CheckGet(db, roptions, "foo", "hello"); + StartPhase("compactall"); + leveldb_compact_range(db, NULL, 0, NULL, 0); + CheckGet(db, roptions, "foo", "hello"); + + StartPhase("compactrange"); + leveldb_compact_range(db, "a", 1, "z", 1); + CheckGet(db, roptions, "foo", "hello"); + StartPhase("writebatch"); { leveldb_writebatch_t* wb = leveldb_writebatch_create(); @@ -279,6 +313,49 @@ int main(int argc, char** argv) { CheckGet(db, roptions, "foo", NULL); CheckGet(db, roptions, "bar", NULL); CheckGet(db, roptions, "box", "c"); + leveldb_options_set_create_if_missing(options, 1); + leveldb_options_set_error_if_exists(options, 1); + } + + StartPhase("filter"); + for (run = 0; run < 2; run++) { + // First run uses custom filter, second run uses bloom filter + CheckNoError(err); + leveldb_filterpolicy_t* policy; + if (run == 0) { + policy = leveldb_filterpolicy_create( + NULL, FilterDestroy, FilterCreate, FilterKeyMatch, FilterName); + } else { + policy = leveldb_filterpolicy_create_bloom(10); + } + + // Create new database + leveldb_close(db); + leveldb_destroy_db(options, dbname, &err); + leveldb_options_set_filter_policy(options, policy); + db = leveldb_open(options, dbname, &err); + CheckNoError(err); + leveldb_put(db, woptions, "foo", 3, "foovalue", 8, &err); + CheckNoError(err); + leveldb_put(db, woptions, "bar", 3, "barvalue", 8, &err); + CheckNoError(err); + leveldb_compact_range(db, NULL, 0, NULL, 0); + + fake_filter_result = 1; + CheckGet(db, roptions, "foo", "foovalue"); + CheckGet(db, roptions, "bar", "barvalue"); + if (phase == 0) { + // Must not find value when custom filter returns false + fake_filter_result = 0; + CheckGet(db, roptions, "foo", NULL); + CheckGet(db, roptions, "bar", NULL); + fake_filter_result = 1; + + CheckGet(db, roptions, "foo", "foovalue"); + CheckGet(db, roptions, "bar", "barvalue"); + } + leveldb_options_set_filter_policy(options, NULL); + leveldb_filterpolicy_destroy(policy); } StartPhase("cleanup"); diff --git a/db/db_bench.cc b/db/db_bench.cc index bbfd618..b0c3995 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -25,15 +25,20 @@ // overwrite -- overwrite N values in random key order in async mode // fillsync -- write N/100 values in random key order in sync mode // fill100K -- write N/1000 100K values in random order in async mode +// deleteseq -- delete N keys in sequential order +// deleterandom -- delete N keys in random order // readseq -- read N times sequentially // readreverse -- read N times in reverse order // readrandom -- read N times in random order +// readmissing -- read N missing keys in random order // readhot -- read N times in random order from 1% section of DB +// seekrandom -- N random seeks // crc32c -- repeated crc32c of 4K of data // acquireload -- load N*1000 times // Meta operations: // compact -- Compact the entire DB // stats -- Print DB stats +// sstables -- Print sstable info // heapprofile -- Dump a heap profile (if supported by this port) static const char* FLAGS_benchmarks = "fillseq," @@ -85,6 +90,10 @@ static int FLAGS_cache_size = -1; // Maximum number of files to keep open at the same time (use default if == 0) static int FLAGS_open_files = 0; +// Bloom filter bits per key. +// Negative means use default settings. +static int FLAGS_bloom_bits = -1; + // If true, do not destroy the existing database. If you set this // flag and also specify a benchmark that wants a fresh database, that // benchmark will fail. @@ -293,6 +302,7 @@ struct ThreadState { class Benchmark { private: Cache* cache_; + const FilterPolicy* filter_policy_; DB* db_; int num_; int value_size_; @@ -378,6 +388,9 @@ class Benchmark { public: Benchmark() : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), + filter_policy_(FLAGS_bloom_bits >= 0 + ? NewBloomFilterPolicy(FLAGS_bloom_bits) + : NULL), db_(NULL), num_(FLAGS_num), value_size_(FLAGS_value_size), @@ -399,6 +412,7 @@ class Benchmark { ~Benchmark() { delete db_; delete cache_; + delete filter_policy_; } void Run() { @@ -457,11 +471,19 @@ class Benchmark { method = &Benchmark::ReadReverse; } else if (name == Slice("readrandom")) { method = &Benchmark::ReadRandom; + } else if (name == Slice("readmissing")) { + method = &Benchmark::ReadMissing; + } else if (name == Slice("seekrandom")) { + method = &Benchmark::SeekRandom; } else if (name == Slice("readhot")) { method = &Benchmark::ReadHot; } else if (name == Slice("readrandomsmall")) { reads_ /= 1000; method = &Benchmark::ReadRandom; + } else if (name == Slice("deleteseq")) { + method = &Benchmark::DeleteSeq; + } else if (name == Slice("deleterandom")) { + method = &Benchmark::DeleteRandom; } else if (name == Slice("readwhilewriting")) { num_threads++; // Add extra thread for writing method = &Benchmark::ReadWhileWriting; @@ -478,7 +500,9 @@ class Benchmark { } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { - PrintStats(); + PrintStats("leveldb.stats"); + } else if (name == Slice("sstables")) { + PrintStats("leveldb.sstables"); } else { if (name != Slice()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); @@ -669,6 +693,7 @@ class Benchmark { options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; + options.filter_policy = filter_policy_; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -743,10 +768,28 @@ class Benchmark { void ReadRandom(ThreadState* thread) { ReadOptions options; std::string value; + int found = 0; for (int i = 0; i < reads_; i++) { char key[100]; const int k = thread->rand.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d", k); + if (db_->Get(options, key, &value).ok()) { + found++; + } + thread->stats.FinishedSingleOp(); + } + char msg[100]; + snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void ReadMissing(ThreadState* thread) { + ReadOptions options; + std::string value; + for (int i = 0; i < reads_; i++) { + char key[100]; + const int k = thread->rand.Next() % FLAGS_num; + snprintf(key, sizeof(key), "%016d.", k); db_->Get(options, key, &value); thread->stats.FinishedSingleOp(); } @@ -765,6 +808,54 @@ class Benchmark { } } + void SeekRandom(ThreadState* thread) { + ReadOptions options; + std::string value; + int found = 0; + for (int i = 0; i < reads_; i++) { + Iterator* iter = db_->NewIterator(options); + char key[100]; + const int k = thread->rand.Next() % FLAGS_num; + snprintf(key, sizeof(key), "%016d", k); + iter->Seek(key); + if (iter->Valid() && iter->key() == key) found++; + delete iter; + thread->stats.FinishedSingleOp(); + } + char msg[100]; + snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void DoDelete(ThreadState* thread, bool seq) { + RandomGenerator gen; + WriteBatch batch; + Status s; + for (int i = 0; i < num_; i += entries_per_batch_) { + batch.Clear(); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); + char key[100]; + snprintf(key, sizeof(key), "%016d", k); + batch.Delete(key); + thread->stats.FinishedSingleOp(); + } + s = db_->Write(write_options_, &batch); + if (!s.ok()) { + fprintf(stderr, "del error: %s\n", s.ToString().c_str()); + exit(1); + } + } + } + + void DeleteSeq(ThreadState* thread) { + DoDelete(thread, true); + } + + void DeleteRandom(ThreadState* thread) { + DoDelete(thread, false); + } + void ReadWhileWriting(ThreadState* thread) { if (thread->tid > 0) { ReadRandom(thread); @@ -799,9 +890,9 @@ class Benchmark { db_->CompactRange(NULL, NULL); } - void PrintStats() { + void PrintStats(const char* key) { std::string stats; - if (!db_->GetProperty("leveldb.stats", &stats)) { + if (!db_->GetProperty(key, &stats)) { stats = "(failed)"; } fprintf(stdout, "\n%s\n", stats.c_str()); @@ -861,6 +952,8 @@ int main(int argc, char** argv) { FLAGS_write_buffer_size = n; } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { FLAGS_cache_size = n; + } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { + FLAGS_bloom_bits = n; } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { FLAGS_open_files = n; } else if (strncmp(argv[i], "--db=", 5) == 0) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 88d17e7..c9c9023 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -87,12 +87,14 @@ static void ClipToRange(T* ptr, V minvalue, V maxvalue) { } Options SanitizeOptions(const std::string& dbname, const InternalKeyComparator* icmp, + const InternalFilterPolicy* ipolicy, const Options& src) { Options result = src; result.comparator = icmp; - ClipToRange(&result.max_open_files, 20, 50000); - ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); - ClipToRange(&result.block_size, 1<<10, 4<<20); + result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; + ClipToRange(&result.max_open_files, 20, 50000); + ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); + ClipToRange(&result.block_size, 1<<10, 4<<20); if (result.info_log == NULL) { // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist @@ -112,7 +114,9 @@ Options SanitizeOptions(const std::string& dbname, DBImpl::DBImpl(const Options& options, const std::string& dbname) : env_(options.env), internal_comparator_(options.comparator), - options_(SanitizeOptions(dbname, &internal_comparator_, options)), + internal_filter_policy_(options.filter_policy), + options_(SanitizeOptions( + dbname, &internal_comparator_, &internal_filter_policy_, options)), owns_info_log_(options_.info_log != options.info_log), owns_cache_(options_.block_cache != options.block_cache), dbname_(dbname), diff --git a/db/db_impl.h b/db/db_impl.h index e665c0e..2f8b523 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -105,6 +105,7 @@ class DBImpl : public DB { // Constant after construction Env* const env_; const InternalKeyComparator internal_comparator_; + const InternalFilterPolicy internal_filter_policy_; const Options options_; // options_.comparator == &internal_comparator_ bool owns_info_log_; bool owns_cache_; @@ -185,6 +186,7 @@ class DBImpl : public DB { // it is not equal to src.info_log. extern Options SanitizeOptions(const std::string& db, const InternalKeyComparator* icmp, + const InternalFilterPolicy* ipolicy, const Options& src); } // namespace leveldb diff --git a/db/db_test.cc b/db/db_test.cc index 8318885..ee10807 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3,12 +3,15 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "leveldb/db.h" +#include "leveldb/filter_policy.h" #include "db/db_impl.h" #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "leveldb/cache.h" #include "leveldb/env.h" #include "leveldb/table.h" +#include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/testharness.h" @@ -22,6 +25,28 @@ static std::string RandomString(Random* rnd, int len) { return r; } +namespace { +class AtomicCounter { + private: + port::Mutex mu_; + int count_; + public: + AtomicCounter() : count_(0) { } + void Increment() { + MutexLock l(&mu_); + count_++; + } + int Read() { + MutexLock l(&mu_); + return count_; + } + void Reset() { + MutexLock l(&mu_); + count_ = 0; + } +}; +} + // Special Env used to delay background operations class SpecialEnv : public EnvWrapper { public: @@ -31,9 +56,13 @@ class SpecialEnv : public EnvWrapper { // Simulate no-space errors while this pointer is non-NULL. port::AtomicPointer no_space_; + bool count_random_reads_; + AtomicCounter random_read_counter_; + explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(NULL); no_space_.Release_Store(NULL); + count_random_reads_ = false; } Status NewWritableFile(const std::string& f, WritableFile** r) { @@ -74,9 +103,44 @@ class SpecialEnv : public EnvWrapper { } return s; } + + Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) { + class CountingFile : public RandomAccessFile { + private: + RandomAccessFile* target_; + AtomicCounter* counter_; + public: + CountingFile(RandomAccessFile* target, AtomicCounter* counter) + : target_(target), counter_(counter) { + } + virtual ~CountingFile() { delete target_; } + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + counter_->Increment(); + return target_->Read(offset, n, result, scratch); + } + }; + + Status s = target()->NewRandomAccessFile(f, r); + if (s.ok() && count_random_reads_) { + *r = new CountingFile(*r, &random_read_counter_); + } + return s; + } }; class DBTest { + private: + const FilterPolicy* filter_policy_; + + // Sequence of option configurations to try + enum OptionConfig { + kDefault, + kFilter, + kEnd + }; + int option_config_; + public: std::string dbname_; SpecialEnv* env_; @@ -84,7 +148,9 @@ class DBTest { Options last_options_; - DBTest() : env_(new SpecialEnv(Env::Default())) { + DBTest() : option_config_(kDefault), + env_(new SpecialEnv(Env::Default())) { + filter_policy_ = NewBloomFilterPolicy(10); dbname_ = test::TmpDir() + "/db_test"; DestroyDB(dbname_, Options()); db_ = NULL; @@ -95,6 +161,32 @@ class DBTest { delete db_; DestroyDB(dbname_, Options()); delete env_; + delete filter_policy_; + } + + // Switch to a fresh database with the next option configuration to + // test. Return false if there are no more configurations to test. + bool ChangeOptions() { + if (option_config_ == kEnd) { + return false; + } else { + option_config_++; + DestroyAndReopen(); + return true; + } + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + switch (option_config_) { + case kFilter: + options.filter_policy = filter_policy_; + break; + default: + break; + } + return options; } DBImpl* dbfull() { @@ -105,6 +197,11 @@ class DBTest { ASSERT_OK(TryReopen(options)); } + void Close() { + delete db_; + db_ = NULL; + } + void DestroyAndReopen(Options* options = NULL) { delete db_; db_ = NULL; @@ -119,6 +216,7 @@ class DBTest { if (options != NULL) { opts = *options; } else { + opts = CurrentOptions(); opts.create_if_missing = true; } last_options_ = opts; @@ -189,8 +287,7 @@ class DBTest { if (!ParseInternalKey(iter->key(), &ikey)) { result += "CORRUPTED"; } else { - if (last_options_.comparator->Compare( - ikey.user_key, user_key) != 0) { + if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) { break; } if (!first) { @@ -314,135 +411,155 @@ class DBTest { }; TEST(DBTest, Empty) { - ASSERT_TRUE(db_ != NULL); - ASSERT_EQ("NOT_FOUND", Get("foo")); + do { + ASSERT_TRUE(db_ != NULL); + ASSERT_EQ("NOT_FOUND", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, ReadWrite) { - ASSERT_OK(Put("foo", "v1")); - ASSERT_EQ("v1", Get("foo")); - ASSERT_OK(Put("bar", "v2")); - ASSERT_OK(Put("foo", "v3")); - ASSERT_EQ("v3", Get("foo")); - ASSERT_EQ("v2", Get("bar")); + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + } while (ChangeOptions()); } TEST(DBTest, PutDeleteGet) { - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1")); - ASSERT_EQ("v1", Get("foo")); - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2")); - ASSERT_EQ("v2", Get("foo")); - ASSERT_OK(db_->Delete(WriteOptions(), "foo")); - ASSERT_EQ("NOT_FOUND", Get("foo")); + do { + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2")); + ASSERT_EQ("v2", Get("foo")); + ASSERT_OK(db_->Delete(WriteOptions(), "foo")); + ASSERT_EQ("NOT_FOUND", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetFromImmutableLayer) { - Options options; - options.env = env_; - options.write_buffer_size = 100000; // Small write buffer - Reopen(&options); + do { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + Reopen(&options); - ASSERT_OK(Put("foo", "v1")); - ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls - Put("k1", std::string(100000, 'x')); // Fill memtable - Put("k2", std::string(100000, 'y')); // Trigger compaction - ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls + env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + Put("k1", std::string(100000, 'x')); // Fill memtable + Put("k2", std::string(100000, 'y')); // Trigger compaction + ASSERT_EQ("v1", Get("foo")); + env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls + } while (ChangeOptions()); } TEST(DBTest, GetFromVersions) { - ASSERT_OK(Put("foo", "v1")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v1", Get("foo")); + do { + ASSERT_OK(Put("foo", "v1")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v1", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetSnapshot) { - // Try with both a short key and a long key - for (int i = 0; i < 2; i++) { - std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); - ASSERT_OK(Put(key, "v1")); - const Snapshot* s1 = db_->GetSnapshot(); - ASSERT_OK(Put(key, "v2")); - ASSERT_EQ("v2", Get(key)); - ASSERT_EQ("v1", Get(key, s1)); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v2", Get(key)); - ASSERT_EQ("v1", Get(key, s1)); - db_->ReleaseSnapshot(s1); - } + do { + // Try with both a short key and a long key + for (int i = 0; i < 2; i++) { + std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); + ASSERT_OK(Put(key, "v1")); + const Snapshot* s1 = db_->GetSnapshot(); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_EQ("v1", Get(key, s1)); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get(key)); + ASSERT_EQ("v1", Get(key, s1)); + db_->ReleaseSnapshot(s1); + } + } while (ChangeOptions()); } TEST(DBTest, GetLevel0Ordering) { - // Check that we process level-0 files in correct order. The code - // below generates two level-0 files where the earlier one comes - // before the later one in the level-0 file list since the earlier - // one has a smaller "smallest" key. - ASSERT_OK(Put("bar", "b")); - ASSERT_OK(Put("foo", "v1")); - dbfull()->TEST_CompactMemTable(); - ASSERT_OK(Put("foo", "v2")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v2", Get("foo")); + do { + // Check that we process level-0 files in correct order. The code + // below generates two level-0 files where the earlier one comes + // before the later one in the level-0 file list since the earlier + // one has a smaller "smallest" key. + ASSERT_OK(Put("bar", "b")); + ASSERT_OK(Put("foo", "v1")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Put("foo", "v2")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetOrderedByLevels) { - ASSERT_OK(Put("foo", "v1")); - Compact("a", "z"); - ASSERT_EQ("v1", Get("foo")); - ASSERT_OK(Put("foo", "v2")); - ASSERT_EQ("v2", Get("foo")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v2", Get("foo")); + do { + ASSERT_OK(Put("foo", "v1")); + Compact("a", "z"); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("foo", "v2")); + ASSERT_EQ("v2", Get("foo")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetPicksCorrectFile) { - // Arrange to have multiple files in a non-level-0 level. - ASSERT_OK(Put("a", "va")); - Compact("a", "b"); - ASSERT_OK(Put("x", "vx")); - Compact("x", "y"); - ASSERT_OK(Put("f", "vf")); - Compact("f", "g"); - ASSERT_EQ("va", Get("a")); - ASSERT_EQ("vf", Get("f")); - ASSERT_EQ("vx", Get("x")); + do { + // Arrange to have multiple files in a non-level-0 level. + ASSERT_OK(Put("a", "va")); + Compact("a", "b"); + ASSERT_OK(Put("x", "vx")); + Compact("x", "y"); + ASSERT_OK(Put("f", "vf")); + Compact("f", "g"); + ASSERT_EQ("va", Get("a")); + ASSERT_EQ("vf", Get("f")); + ASSERT_EQ("vx", Get("x")); + } while (ChangeOptions()); } TEST(DBTest, GetEncountersEmptyLevel) { - // Arrange for the following to happen: - // * sstable A in level 0 - // * nothing in level 1 - // * sstable B in level 2 - // Then do enough Get() calls to arrange for an automatic compaction - // of sstable A. A bug would cause the compaction to be marked as - // occuring at level 1 (instead of the correct level 0). - - // Step 1: First place sstables in levels 0 and 2 - int compaction_count = 0; - while (NumTableFilesAtLevel(0) == 0 || - NumTableFilesAtLevel(2) == 0) { - ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; - compaction_count++; - Put("a", "begin"); - Put("z", "end"); - dbfull()->TEST_CompactMemTable(); - } - - // Step 2: clear level 1 if necessary. - dbfull()->TEST_CompactRange(1, NULL, NULL); - ASSERT_EQ(NumTableFilesAtLevel(0), 1); - ASSERT_EQ(NumTableFilesAtLevel(1), 0); - ASSERT_EQ(NumTableFilesAtLevel(2), 1); + do { + // Arrange for the following to happen: + // * sstable A in level 0 + // * nothing in level 1 + // * sstable B in level 2 + // Then do enough Get() calls to arrange for an automatic compaction + // of sstable A. A bug would cause the compaction to be marked as + // occuring at level 1 (instead of the correct level 0). + + // Step 1: First place sstables in levels 0 and 2 + int compaction_count = 0; + while (NumTableFilesAtLevel(0) == 0 || + NumTableFilesAtLevel(2) == 0) { + ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; + compaction_count++; + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + } - // Step 3: read until level 0 compaction disappears. - int read_count = 0; - while (NumTableFilesAtLevel(0) > 0) { - ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction"; - read_count++; - ASSERT_EQ("NOT_FOUND", Get("missing")); - } + // Step 2: clear level 1 if necessary. + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 1); + + // Step 3: read until level 0 compaction disappears. + int read_count = 0; + while (NumTableFilesAtLevel(0) > 0) { + ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction"; + read_count++; + ASSERT_EQ("NOT_FOUND", Get("missing")); + } + } while (ChangeOptions()); } TEST(DBTest, IterEmpty) { @@ -620,69 +737,77 @@ TEST(DBTest, IterSmallAndLargeMix) { } TEST(DBTest, IterMultiWithDelete) { - ASSERT_OK(Put("a", "va")); - ASSERT_OK(Put("b", "vb")); - ASSERT_OK(Put("c", "vc")); - ASSERT_OK(Delete("b")); - ASSERT_EQ("NOT_FOUND", Get("b")); + do { + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Put("b", "vb")); + ASSERT_OK(Put("c", "vc")); + ASSERT_OK(Delete("b")); + ASSERT_EQ("NOT_FOUND", Get("b")); - Iterator* iter = db_->NewIterator(ReadOptions()); - iter->Seek("c"); - ASSERT_EQ(IterStatus(iter), "c->vc"); - iter->Prev(); - ASSERT_EQ(IterStatus(iter), "a->va"); - delete iter; + Iterator* iter = db_->NewIterator(ReadOptions()); + iter->Seek("c"); + ASSERT_EQ(IterStatus(iter), "c->vc"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter), "a->va"); + delete iter; + } while (ChangeOptions()); } TEST(DBTest, Recover) { - ASSERT_OK(Put("foo", "v1")); - ASSERT_OK(Put("baz", "v5")); + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("baz", "v5")); - Reopen(); - ASSERT_EQ("v1", Get("foo")); + Reopen(); + ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("v5", Get("baz")); - ASSERT_OK(Put("bar", "v2")); - ASSERT_OK(Put("foo", "v3")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v5", Get("baz")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); - Reopen(); - ASSERT_EQ("v3", Get("foo")); - ASSERT_OK(Put("foo", "v4")); - ASSERT_EQ("v4", Get("foo")); - ASSERT_EQ("v2", Get("bar")); - ASSERT_EQ("v5", Get("baz")); + Reopen(); + ASSERT_EQ("v3", Get("foo")); + ASSERT_OK(Put("foo", "v4")); + ASSERT_EQ("v4", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ("v5", Get("baz")); + } while (ChangeOptions()); } TEST(DBTest, RecoveryWithEmptyLog) { - ASSERT_OK(Put("foo", "v1")); - ASSERT_OK(Put("foo", "v2")); - Reopen(); - Reopen(); - ASSERT_OK(Put("foo", "v3")); - Reopen(); - ASSERT_EQ("v3", Get("foo")); + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("foo", "v2")); + Reopen(); + Reopen(); + ASSERT_OK(Put("foo", "v3")); + Reopen(); + ASSERT_EQ("v3", Get("foo")); + } while (ChangeOptions()); } // Check that writes done during a memtable compaction are recovered // if the database is shutdown during the memtable compaction. TEST(DBTest, RecoverDuringMemtableCompaction) { - Options options; - options.env = env_; - options.write_buffer_size = 1000000; - Reopen(&options); + do { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 1000000; + Reopen(&options); - // Trigger a long memtable compaction and reopen the database during it - ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file - ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable - ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction - ASSERT_OK(Put("bar", "v2")); // Goes to new log file + // Trigger a long memtable compaction and reopen the database during it + ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file + ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable + ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction + ASSERT_OK(Put("bar", "v2")); // Goes to new log file - Reopen(&options); - ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("v2", Get("bar")); - ASSERT_EQ(std::string(10000000, 'x'), Get("big1")); - ASSERT_EQ(std::string(1000, 'y'), Get("big2")); + Reopen(&options); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ(std::string(10000000, 'x'), Get("big1")); + ASSERT_EQ(std::string(1000, 'y'), Get("big2")); + } while (ChangeOptions()); } static std::string Key(int i) { @@ -692,7 +817,7 @@ static std::string Key(int i) { } TEST(DBTest, MinorCompactionsHappen) { - Options options; + Options options = CurrentOptions(); options.write_buffer_size = 10000; Reopen(&options); @@ -718,7 +843,7 @@ TEST(DBTest, MinorCompactionsHappen) { TEST(DBTest, RecoverWithLargeLog) { { - Options options; + Options options = CurrentOptions(); Reopen(&options); ASSERT_OK(Put("big1", std::string(200000, '1'))); ASSERT_OK(Put("big2", std::string(200000, '2'))); @@ -729,7 +854,7 @@ TEST(DBTest, RecoverWithLargeLog) { // Make sure that if we re-open with a small write buffer size that // we flush table files in the middle of a large log file. - Options options; + Options options = CurrentOptions(); options.write_buffer_size = 100000; Reopen(&options); ASSERT_EQ(NumTableFilesAtLevel(0), 3); @@ -741,7 +866,7 @@ TEST(DBTest, RecoverWithLargeLog) { } TEST(DBTest, CompactionsGenerateMultipleFiles) { - Options options; + Options options = CurrentOptions(); options.write_buffer_size = 100000000; // Large write buffer Reopen(&options); @@ -767,7 +892,7 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } TEST(DBTest, RepeatedWritesToSameKey) { - Options options; + Options options = CurrentOptions(); options.env = env_; options.write_buffer_size = 100000; // Small write buffer Reopen(&options); @@ -786,7 +911,7 @@ TEST(DBTest, RepeatedWritesToSameKey) { } TEST(DBTest, SparseMerge) { - Options options; + Options options = CurrentOptions(); options.compression = kNoCompression; Reopen(&options); @@ -837,87 +962,91 @@ static bool Between(uint64_t val, uint64_t low, uint64_t high) { } TEST(DBTest, ApproximateSizes) { - Options options; - options.write_buffer_size = 100000000; // Large write buffer - options.compression = kNoCompression; - DestroyAndReopen(); + do { + Options options = CurrentOptions(); + options.write_buffer_size = 100000000; // Large write buffer + options.compression = kNoCompression; + DestroyAndReopen(); - ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); - Reopen(&options); - ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); + ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); + Reopen(&options); + ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); - // Write 8MB (80 values, each 100K) - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - const int N = 80; - Random rnd(301); - for (int i = 0; i < N; i++) { - ASSERT_OK(Put(Key(i), RandomString(&rnd, 100000))); - } + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + const int N = 80; + static const int S1 = 100000; + static const int S2 = 105000; // Allow some expansion from metadata + Random rnd(301); + for (int i = 0; i < N; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, S1))); + } - // 0 because GetApproximateSizes() does not account for memtable space - ASSERT_TRUE(Between(Size("", Key(50)), 0, 0)); + // 0 because GetApproximateSizes() does not account for memtable space + ASSERT_TRUE(Between(Size("", Key(50)), 0, 0)); - // Check sizes across recovery by reopening a few times - for (int run = 0; run < 3; run++) { - Reopen(&options); + // Check sizes across recovery by reopening a few times + for (int run = 0; run < 3; run++) { + Reopen(&options); - for (int compact_start = 0; compact_start < N; compact_start += 10) { - for (int i = 0; i < N; i += 10) { - ASSERT_TRUE(Between(Size("", Key(i)), 100000*i, 100000*i + 10000)); - ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), - 100000 * (i+1), 100000 * (i+1) + 10000)); - ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), - 100000 * 10, 100000 * 10 + 10000)); + for (int compact_start = 0; compact_start < N; compact_start += 10) { + for (int i = 0; i < N; i += 10) { + ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i)); + ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1))); + ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10)); + } + ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50)); + ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50)); + + std::string cstart_str = Key(compact_start); + std::string cend_str = Key(compact_start + 9); + Slice cstart = cstart_str; + Slice cend = cend_str; + dbfull()->TEST_CompactRange(0, &cstart, &cend); } - ASSERT_TRUE(Between(Size("", Key(50)), 5000000, 5010000)); - ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), 5100000, 5110000)); - - std::string cstart_str = Key(compact_start); - std::string cend_str = Key(compact_start + 9); - Slice cstart = cstart_str; - Slice cend = cend_str; - dbfull()->TEST_CompactRange(0, &cstart, &cend); - } - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - ASSERT_GT(NumTableFilesAtLevel(1), 0); - } + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_GT(NumTableFilesAtLevel(1), 0); + } + } while (ChangeOptions()); } TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { - Options options; - options.compression = kNoCompression; - Reopen(); - - Random rnd(301); - std::string big1 = RandomString(&rnd, 100000); - ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(2), big1)); - ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(4), big1)); - ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000))); - ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000))); - - // Check sizes across recovery by reopening a few times - for (int run = 0; run < 3; run++) { - Reopen(&options); - - ASSERT_TRUE(Between(Size("", Key(0)), 0, 0)); - ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000)); - ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000)); - ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000)); - ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000)); - ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000)); - ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000)); - ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000)); - ASSERT_TRUE(Between(Size("", Key(8)), 550000, 551000)); - - ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000)); + do { + Options options = CurrentOptions(); + options.compression = kNoCompression; + Reopen(); - dbfull()->TEST_CompactRange(0, NULL, NULL); - } + Random rnd(301); + std::string big1 = RandomString(&rnd, 100000); + ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(2), big1)); + ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(4), big1)); + ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000))); + ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000))); + + // Check sizes across recovery by reopening a few times + for (int run = 0; run < 3; run++) { + Reopen(&options); + + ASSERT_TRUE(Between(Size("", Key(0)), 0, 0)); + ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000)); + ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000)); + ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000)); + ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000)); + ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000)); + ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000)); + ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000)); + ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000)); + + ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000)); + + dbfull()->TEST_CompactRange(0, NULL, NULL); + } + } while (ChangeOptions()); } TEST(DBTest, IteratorPinsRef) { @@ -943,59 +1072,63 @@ TEST(DBTest, IteratorPinsRef) { } TEST(DBTest, Snapshot) { - Put("foo", "v1"); - const Snapshot* s1 = db_->GetSnapshot(); - Put("foo", "v2"); - const Snapshot* s2 = db_->GetSnapshot(); - Put("foo", "v3"); - const Snapshot* s3 = db_->GetSnapshot(); - - Put("foo", "v4"); - ASSERT_EQ("v1", Get("foo", s1)); - ASSERT_EQ("v2", Get("foo", s2)); - ASSERT_EQ("v3", Get("foo", s3)); - ASSERT_EQ("v4", Get("foo")); - - db_->ReleaseSnapshot(s3); - ASSERT_EQ("v1", Get("foo", s1)); - ASSERT_EQ("v2", Get("foo", s2)); - ASSERT_EQ("v4", Get("foo")); - - db_->ReleaseSnapshot(s1); - ASSERT_EQ("v2", Get("foo", s2)); - ASSERT_EQ("v4", Get("foo")); - - db_->ReleaseSnapshot(s2); - ASSERT_EQ("v4", Get("foo")); -} + do { + Put("foo", "v1"); + const Snapshot* s1 = db_->GetSnapshot(); + Put("foo", "v2"); + const Snapshot* s2 = db_->GetSnapshot(); + Put("foo", "v3"); + const Snapshot* s3 = db_->GetSnapshot(); + + Put("foo", "v4"); + ASSERT_EQ("v1", Get("foo", s1)); + ASSERT_EQ("v2", Get("foo", s2)); + ASSERT_EQ("v3", Get("foo", s3)); + ASSERT_EQ("v4", Get("foo")); + + db_->ReleaseSnapshot(s3); + ASSERT_EQ("v1", Get("foo", s1)); + ASSERT_EQ("v2", Get("foo", s2)); + ASSERT_EQ("v4", Get("foo")); -TEST(DBTest, HiddenValuesAreRemoved) { - Random rnd(301); - FillLevels("a", "z"); + db_->ReleaseSnapshot(s1); + ASSERT_EQ("v2", Get("foo", s2)); + ASSERT_EQ("v4", Get("foo")); - std::string big = RandomString(&rnd, 50000); - Put("foo", big); - Put("pastfoo", "v"); - const Snapshot* snapshot = db_->GetSnapshot(); - Put("foo", "tiny"); - Put("pastfoo2", "v2"); // Advance sequence number one more + db_->ReleaseSnapshot(s2); + ASSERT_EQ("v4", Get("foo")); + } while (ChangeOptions()); +} - ASSERT_OK(dbfull()->TEST_CompactMemTable()); - ASSERT_GT(NumTableFilesAtLevel(0), 0); - - ASSERT_EQ(big, Get("foo", snapshot)); - ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000)); - db_->ReleaseSnapshot(snapshot); - ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]"); - Slice x("x"); - dbfull()->TEST_CompactRange(0, NULL, &x); - ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - ASSERT_GE(NumTableFilesAtLevel(1), 1); - dbfull()->TEST_CompactRange(1, NULL, &x); - ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); +TEST(DBTest, HiddenValuesAreRemoved) { + do { + Random rnd(301); + FillLevels("a", "z"); + + std::string big = RandomString(&rnd, 50000); + Put("foo", big); + Put("pastfoo", "v"); + const Snapshot* snapshot = db_->GetSnapshot(); + Put("foo", "tiny"); + Put("pastfoo2", "v2"); // Advance sequence number one more + + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_GT(NumTableFilesAtLevel(0), 0); + + ASSERT_EQ(big, Get("foo", snapshot)); + ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000)); + db_->ReleaseSnapshot(snapshot); + ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]"); + Slice x("x"); + dbfull()->TEST_CompactRange(0, NULL, &x); + ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_GE(NumTableFilesAtLevel(1), 1); + dbfull()->TEST_CompactRange(1, NULL, &x); + ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); - ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000)); + ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000)); + } while (ChangeOptions()); } TEST(DBTest, DeletionMarkers1) { @@ -1054,85 +1187,87 @@ TEST(DBTest, DeletionMarkers2) { } TEST(DBTest, OverlapInLevel0) { - ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config"; + do { + ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config"; - // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. - ASSERT_OK(Put("100", "v100")); - ASSERT_OK(Put("999", "v999")); - dbfull()->TEST_CompactMemTable(); - ASSERT_OK(Delete("100")); - ASSERT_OK(Delete("999")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("0,1,1", FilesPerLevel()); - - // Make files spanning the following ranges in level-0: - // files[0] 200 .. 900 - // files[1] 300 .. 500 - // Note that files are sorted by smallest key. - ASSERT_OK(Put("300", "v300")); - ASSERT_OK(Put("500", "v500")); - dbfull()->TEST_CompactMemTable(); - ASSERT_OK(Put("200", "v200")); - ASSERT_OK(Put("600", "v600")); - ASSERT_OK(Put("900", "v900")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("2,1,1", FilesPerLevel()); + // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. + ASSERT_OK(Put("100", "v100")); + ASSERT_OK(Put("999", "v999")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Delete("100")); + ASSERT_OK(Delete("999")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("0,1,1", FilesPerLevel()); + + // Make files spanning the following ranges in level-0: + // files[0] 200 .. 900 + // files[1] 300 .. 500 + // Note that files are sorted by smallest key. + ASSERT_OK(Put("300", "v300")); + ASSERT_OK(Put("500", "v500")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Put("200", "v200")); + ASSERT_OK(Put("600", "v600")); + ASSERT_OK(Put("900", "v900")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("2,1,1", FilesPerLevel()); - // Compact away the placeholder files we created initially - dbfull()->TEST_CompactRange(1, NULL, NULL); - dbfull()->TEST_CompactRange(2, NULL, NULL); - ASSERT_EQ("2", FilesPerLevel()); + // Compact away the placeholder files we created initially + dbfull()->TEST_CompactRange(1, NULL, NULL); + dbfull()->TEST_CompactRange(2, NULL, NULL); + ASSERT_EQ("2", FilesPerLevel()); - // Do a memtable compaction. Before bug-fix, the compaction would - // not detect the overlap with level-0 files and would incorrectly place - // the deletion in a deeper level. - ASSERT_OK(Delete("600")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("3", FilesPerLevel()); - ASSERT_EQ("NOT_FOUND", Get("600")); + // Do a memtable compaction. Before bug-fix, the compaction would + // not detect the overlap with level-0 files and would incorrectly place + // the deletion in a deeper level. + ASSERT_OK(Delete("600")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("3", FilesPerLevel()); + ASSERT_EQ("NOT_FOUND", Get("600")); + } while (ChangeOptions()); } TEST(DBTest, L0_CompactionBug_Issue44_a) { - Reopen(); - ASSERT_OK(Put("b", "v")); - Reopen(); - ASSERT_OK(Delete("b")); - ASSERT_OK(Delete("a")); - Reopen(); - ASSERT_OK(Delete("a")); - Reopen(); - ASSERT_OK(Put("a", "v")); - Reopen(); - Reopen(); - ASSERT_EQ("(a->v)", Contents()); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish - ASSERT_EQ("(a->v)", Contents()); + Reopen(); + ASSERT_OK(Put("b", "v")); + Reopen(); + ASSERT_OK(Delete("b")); + ASSERT_OK(Delete("a")); + Reopen(); + ASSERT_OK(Delete("a")); + Reopen(); + ASSERT_OK(Put("a", "v")); + Reopen(); + Reopen(); + ASSERT_EQ("(a->v)", Contents()); + env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + ASSERT_EQ("(a->v)", Contents()); } TEST(DBTest, L0_CompactionBug_Issue44_b) { - Reopen(); - Put("",""); - Reopen(); - Delete("e"); - Put("",""); - Reopen(); - Put("c", "cv"); - Reopen(); - Put("",""); - Reopen(); - Put("",""); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish - Reopen(); - Put("d","dv"); - Reopen(); - Put("",""); - Reopen(); - Delete("d"); - Delete("b"); - Reopen(); - ASSERT_EQ("(->)(c->cv)", Contents()); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish - ASSERT_EQ("(->)(c->cv)", Contents()); + Reopen(); + Put("",""); + Reopen(); + Delete("e"); + Put("",""); + Reopen(); + Put("c", "cv"); + Reopen(); + Put("",""); + Reopen(); + Put("",""); + env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + Reopen(); + Put("d","dv"); + Reopen(); + Put("",""); + Reopen(); + Delete("d"); + Delete("b"); + Reopen(); + ASSERT_EQ("(->)(c->cv)", Contents()); + env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + ASSERT_EQ("(->)(c->cv)", Contents()); } TEST(DBTest, ComparatorCheck) { @@ -1150,7 +1285,7 @@ TEST(DBTest, ComparatorCheck) { } }; NewComparator cmp; - Options new_options; + Options new_options = CurrentOptions(); new_options.comparator = &cmp; Status s = TryReopen(&new_options); ASSERT_TRUE(!s.ok()); @@ -1185,9 +1320,10 @@ TEST(DBTest, CustomComparator) { } }; NumberComparator cmp; - Options new_options; + Options new_options = CurrentOptions(); new_options.create_if_missing = true; new_options.comparator = &cmp; + new_options.filter_policy = NULL; // Cannot use bloom filters new_options.write_buffer_size = 1000; // Compact more often DestroyAndReopen(&new_options); ASSERT_OK(Put("[10]", "ten")); @@ -1197,6 +1333,8 @@ TEST(DBTest, CustomComparator) { ASSERT_EQ("ten", Get("[0xa]")); ASSERT_EQ("twenty", Get("[20]")); ASSERT_EQ("twenty", Get("[0x14]")); + ASSERT_EQ("NOT_FOUND", Get("[15]")); + ASSERT_EQ("NOT_FOUND", Get("[0xf]")); Compact("[0]", "[9999]"); } @@ -1285,7 +1423,7 @@ TEST(DBTest, DBOpen_Options) { // Check that number of files does not grow when we are out of space TEST(DBTest, NoSpace) { - Options options; + Options options = CurrentOptions(); options.env = env_; Reopen(&options); @@ -1314,6 +1452,53 @@ TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_EQ(CountFiles(), num_files); } +TEST(DBTest, BloomFilter) { + env_->count_random_reads_ = true; + Options options = CurrentOptions(); + options.env = env_; + options.block_cache = NewLRUCache(0); // Prevent cache hits + options.filter_policy = NewBloomFilterPolicy(10); + Reopen(&options); + + // Populate multiple layers + const int N = 10000; + for (int i = 0; i < N; i++) { + ASSERT_OK(Put(Key(i), Key(i))); + } + Compact("a", "z"); + for (int i = 0; i < N; i += 100) { + ASSERT_OK(Put(Key(i), Key(i))); + } + dbfull()->TEST_CompactMemTable(); + + // Prevent auto compactions triggered by seeks + env_->delay_sstable_sync_.Release_Store(env_); + + // Lookup present keys. Should rarely read from small sstable. + env_->random_read_counter_.Reset(); + for (int i = 0; i < N; i++) { + ASSERT_EQ(Key(i), Get(Key(i))); + } + int reads = env_->random_read_counter_.Read(); + fprintf(stderr, "%d present => %d reads\n", N, reads); + ASSERT_GE(reads, N); + ASSERT_LE(reads, N + 2*N/100); + + // Lookup present keys. Should rarely read from either sstable. + env_->random_read_counter_.Reset(); + for (int i = 0; i < N; i++) { + ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing")); + } + reads = env_->random_read_counter_.Read(); + fprintf(stderr, "%d missing => %d reads\n", N, reads); + ASSERT_LE(reads, 3*N/100); + + env_->delay_sstable_sync_.Release_Store(NULL); + Close(); + delete options.block_cache; + delete options.filter_policy; +} + // Multi-threaded test: namespace { @@ -1381,33 +1566,35 @@ static void MTThreadBody(void* arg) { } // namespace TEST(DBTest, MultiThreaded) { - // Initialize state - MTState mt; - mt.test = this; - mt.stop.Release_Store(0); - for (int id = 0; id < kNumThreads; id++) { - mt.counter[id].Release_Store(0); - mt.thread_done[id].Release_Store(0); - } - - // Start threads - MTThread thread[kNumThreads]; - for (int id = 0; id < kNumThreads; id++) { - thread[id].state = &mt; - thread[id].id = id; - env_->StartThread(MTThreadBody, &thread[id]); - } - - // Let them run for a while - env_->SleepForMicroseconds(kTestSeconds * 1000000); - - // Stop the threads and wait for them to finish - mt.stop.Release_Store(&mt); - for (int id = 0; id < kNumThreads; id++) { - while (mt.thread_done[id].Acquire_Load() == NULL) { - env_->SleepForMicroseconds(100000); + do { + // Initialize state + MTState mt; + mt.test = this; + mt.stop.Release_Store(0); + for (int id = 0; id < kNumThreads; id++) { + mt.counter[id].Release_Store(0); + mt.thread_done[id].Release_Store(0); } - } + + // Start threads + MTThread thread[kNumThreads]; + for (int id = 0; id < kNumThreads; id++) { + thread[id].state = &mt; + thread[id].id = id; + env_->StartThread(MTThreadBody, &thread[id]); + } + + // Let them run for a while + env_->SleepForMicroseconds(kTestSeconds * 1000000); + + // Stop the threads and wait for them to finish + mt.stop.Release_Store(&mt); + for (int id = 0; id < kNumThreads; id++) { + while (mt.thread_done[id].Acquire_Load() == NULL) { + env_->SleepForMicroseconds(100000); + } + } + } while (ChangeOptions()); } namespace { @@ -1573,70 +1760,73 @@ static bool CompareIterators(int step, TEST(DBTest, Randomized) { Random rnd(test::RandomSeed()); - ModelDB model(last_options_); - const int N = 10000; - const Snapshot* model_snap = NULL; - const Snapshot* db_snap = NULL; - std::string k, v; - for (int step = 0; step < N; step++) { - if (step % 100 == 0) { - fprintf(stderr, "Step %d of %d\n", step, N); - } - int p = rnd.Uniform(100); - if (p < 45) { // Put - k = RandomKey(&rnd); - v = RandomString(&rnd, - rnd.OneIn(20) - ? 100 + rnd.Uniform(100) - : rnd.Uniform(8)); - ASSERT_OK(model.Put(WriteOptions(), k, v)); - ASSERT_OK(db_->Put(WriteOptions(), k, v)); - - } else if (p < 90) { // Delete - k = RandomKey(&rnd); - ASSERT_OK(model.Delete(WriteOptions(), k)); - ASSERT_OK(db_->Delete(WriteOptions(), k)); - - - } else { // Multi-element batch - WriteBatch b; - const int num = rnd.Uniform(8); - for (int i = 0; i < num; i++) { - if (i == 0 || !rnd.OneIn(10)) { - k = RandomKey(&rnd); - } else { - // Periodically re-use the same key from the previous iter, so - // we have multiple entries in the write batch for the same key - } - if (rnd.OneIn(2)) { - v = RandomString(&rnd, rnd.Uniform(10)); - b.Put(k, v); - } else { - b.Delete(k); + do { + ModelDB model(CurrentOptions()); + const int N = 10000; + const Snapshot* model_snap = NULL; + const Snapshot* db_snap = NULL; + std::string k, v; + for (int step = 0; step < N; step++) { + if (step % 100 == 0) { + fprintf(stderr, "Step %d of %d\n", step, N); + } + // TODO(sanjay): Test Get() works + int p = rnd.Uniform(100); + if (p < 45) { // Put + k = RandomKey(&rnd); + v = RandomString(&rnd, + rnd.OneIn(20) + ? 100 + rnd.Uniform(100) + : rnd.Uniform(8)); + ASSERT_OK(model.Put(WriteOptions(), k, v)); + ASSERT_OK(db_->Put(WriteOptions(), k, v)); + + } else if (p < 90) { // Delete + k = RandomKey(&rnd); + ASSERT_OK(model.Delete(WriteOptions(), k)); + ASSERT_OK(db_->Delete(WriteOptions(), k)); + + + } else { // Multi-element batch + WriteBatch b; + const int num = rnd.Uniform(8); + for (int i = 0; i < num; i++) { + if (i == 0 || !rnd.OneIn(10)) { + k = RandomKey(&rnd); + } else { + // Periodically re-use the same key from the previous iter, so + // we have multiple entries in the write batch for the same key + } + if (rnd.OneIn(2)) { + v = RandomString(&rnd, rnd.Uniform(10)); + b.Put(k, v); + } else { + b.Delete(k); + } } + ASSERT_OK(model.Write(WriteOptions(), &b)); + ASSERT_OK(db_->Write(WriteOptions(), &b)); } - ASSERT_OK(model.Write(WriteOptions(), &b)); - ASSERT_OK(db_->Write(WriteOptions(), &b)); - } - if ((step % 100) == 0) { - ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); - ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap)); - // Save a snapshot from each DB this time that we'll use next - // time we compare things, to make sure the current state is - // preserved with the snapshot - if (model_snap != NULL) model.ReleaseSnapshot(model_snap); - if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); + if ((step % 100) == 0) { + ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); + ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap)); + // Save a snapshot from each DB this time that we'll use next + // time we compare things, to make sure the current state is + // preserved with the snapshot + if (model_snap != NULL) model.ReleaseSnapshot(model_snap); + if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); - Reopen(); - ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); + Reopen(); + ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); - model_snap = model.GetSnapshot(); - db_snap = db_->GetSnapshot(); + model_snap = model.GetSnapshot(); + db_snap = db_->GetSnapshot(); + } } - } - if (model_snap != NULL) model.ReleaseSnapshot(model_snap); - if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); + if (model_snap != NULL) model.ReleaseSnapshot(model_snap); + if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); + } while (ChangeOptions()); } std::string MakeKey(unsigned int num) { diff --git a/db/dbformat.cc b/db/dbformat.cc index 9168f99..28e11b3 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -98,6 +98,26 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const { } } +const char* InternalFilterPolicy::Name() const { + return user_policy_->Name(); +} + +void InternalFilterPolicy::CreateFilter(const Slice* keys, int n, + std::string* dst) const { + // We rely on the fact that the code in table.cc does not mind us + // adjusting keys[]. + Slice* mkey = const_cast<Slice*>(keys); + for (int i = 0; i < n; i++) { + mkey[i] = ExtractUserKey(keys[i]); + // TODO(sanjay): Suppress dups? + } + user_policy_->CreateFilter(keys, n, dst); +} + +bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { + return user_policy_->KeyMayMatch(ExtractUserKey(key), f); +} + LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { size_t usize = user_key.size(); size_t needed = usize + 13; // A conservative estimate diff --git a/db/dbformat.h b/db/dbformat.h index 044717d..f7f64da 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -8,6 +8,7 @@ #include <stdio.h> #include "leveldb/comparator.h" #include "leveldb/db.h" +#include "leveldb/filter_policy.h" #include "leveldb/slice.h" #include "leveldb/table_builder.h" #include "util/coding.h" @@ -123,6 +124,17 @@ class InternalKeyComparator : public Comparator { int Compare(const InternalKey& a, const InternalKey& b) const; }; +// Filter policy wrapper that converts from internal keys to user keys +class InternalFilterPolicy : public FilterPolicy { + private: + const FilterPolicy* const user_policy_; + public: + explicit InternalFilterPolicy(const FilterPolicy* p) : user_policy_(p) { } + virtual const char* Name() const; + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const; + virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const; +}; + // Modules in this directory should keep internal keys wrapped inside // the following class instead of plain strings so that we do not // incorrectly use string comparisons instead of an InternalKeyComparator. diff --git a/db/repair.cc b/db/repair.cc index 511c66b..022d52f 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -48,7 +48,8 @@ class Repairer { : dbname_(dbname), env_(options.env), icmp_(options.comparator), - options_(SanitizeOptions(dbname, &icmp_, options)), + ipolicy_(options.filter_policy), + options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)), owns_info_log_(options_.info_log != options.info_log), owns_cache_(options_.block_cache != options.block_cache), next_file_number_(1) { @@ -99,6 +100,7 @@ class Repairer { std::string const dbname_; Env* const env_; InternalKeyComparator const icmp_; + InternalFilterPolicy const ipolicy_; Options const options_; bool owns_info_log_; bool owns_cache_; diff --git a/db/table_cache.cc b/db/table_cache.cc index cae79bd..497db27 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -42,23 +42,18 @@ TableCache::~TableCache() { delete cache_; } -Iterator* TableCache::NewIterator(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, - Table** tableptr) { - if (tableptr != NULL) { - *tableptr = NULL; - } - +Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, + Cache::Handle** handle) { + Status s; char buf[sizeof(file_number)]; EncodeFixed64(buf, file_number); Slice key(buf, sizeof(buf)); - Cache::Handle* handle = cache_->Lookup(key); - if (handle == NULL) { + *handle = cache_->Lookup(key); + if (*handle == NULL) { std::string fname = TableFileName(dbname_, file_number); RandomAccessFile* file = NULL; Table* table = NULL; - Status s = env_->NewRandomAccessFile(fname, &file); + s = env_->NewRandomAccessFile(fname, &file); if (s.ok()) { s = Table::Open(*options_, file, file_size, &table); } @@ -68,13 +63,28 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, delete file; // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. - return NewErrorIterator(s); + } else { + TableAndFile* tf = new TableAndFile; + tf->file = file; + tf->table = table; + *handle = cache_->Insert(key, tf, 1, &DeleteEntry); } + } + return s; +} - TableAndFile* tf = new TableAndFile; - tf->file = file; - tf->table = table; - handle = cache_->Insert(key, tf, 1, &DeleteEntry); +Iterator* TableCache::NewIterator(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + Table** tableptr) { + if (tableptr != NULL) { + *tableptr = NULL; + } + + Cache::Handle* handle = NULL; + Status s = FindTable(file_number, file_size, &handle); + if (!s.ok()) { + return NewErrorIterator(s); } Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table; @@ -86,6 +96,22 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, return result; } +Status TableCache::Get(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + const Slice& k, + void* arg, + void (*saver)(void*, const Slice&, const Slice&)) { + Cache::Handle* handle = NULL; + Status s = FindTable(file_number, file_size, &handle); + if (s.ok()) { + Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table; + s = t->InternalGet(options, k, arg, saver); + cache_->Release(handle); + } + return s; +} + void TableCache::Evict(uint64_t file_number) { char buf[sizeof(file_number)]; EncodeFixed64(buf, file_number); diff --git a/db/table_cache.h b/db/table_cache.h index 0f3c73b..8cf4aaf 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -35,6 +35,15 @@ class TableCache { uint64_t file_size, Table** tableptr = NULL); + // If a seek to internal key "k" in specified file finds an entry, + // call (*handle_result)(arg, found_key, found_value). + Status Get(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + const Slice& k, + void* arg, + void (*handle_result)(void*, const Slice&, const Slice&)); + // Evict any entry for the specified file number void Evict(uint64_t file_number); @@ -43,6 +52,8 @@ class TableCache { const std::string dbname_; const Options* options_; Cache* cache_; + + Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**); }; } // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 1310aeb..1f48419 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -255,35 +255,34 @@ void Version::AddIterators(const ReadOptions& options, } } -// If "*iter" points at a value or deletion for user_key, store -// either the value, or a NotFound error and return true. -// Else return false. -static bool GetValue(const Comparator* cmp, - Iterator* iter, const Slice& user_key, - std::string* value, - Status* s) { - if (!iter->Valid()) { - return false; - } +// Callback from TableCache::Get() +namespace { +enum SaverState { + kNotFound, + kFound, + kDeleted, + kCorrupt, +}; +struct Saver { + SaverState state; + const Comparator* ucmp; + Slice user_key; + std::string* value; +}; +} +static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { + Saver* s = reinterpret_cast<Saver*>(arg); ParsedInternalKey parsed_key; - if (!ParseInternalKey(iter->key(), &parsed_key)) { - *s = Status::Corruption("corrupted key for ", user_key); - return true; - } - if (cmp->Compare(parsed_key.user_key, user_key) != 0) { - return false; - } - switch (parsed_key.type) { - case kTypeDeletion: - *s = Status::NotFound(Slice()); // Use an empty error message for speed - break; - case kTypeValue: { - Slice v = iter->value(); - value->assign(v.data(), v.size()); - break; + if (!ParseInternalKey(ikey, &parsed_key)) { + s->state = kCorrupt; + } else { + if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; + if (s->state == kFound) { + s->value->assign(v.data(), v.size()); + } } } - return true; } static bool NewestFirst(FileMetaData* a, FileMetaData* b) { @@ -361,21 +360,27 @@ Status Version::Get(const ReadOptions& options, last_file_read = f; last_file_read_level = level; - Iterator* iter = vset_->table_cache_->NewIterator( - options, - f->number, - f->file_size); - iter->Seek(ikey); - const bool done = GetValue(ucmp, iter, user_key, value, &s); - if (!iter->status().ok()) { - s = iter->status(); - delete iter; + Saver saver; + saver.state = kNotFound; + saver.ucmp = ucmp; + saver.user_key = user_key; + saver.value = value; + s = vset_->table_cache_->Get(options, f->number, f->file_size, + ikey, &saver, SaveValue); + if (!s.ok()) { return s; - } else { - delete iter; - if (done) { + } + switch (saver.state) { + case kNotFound: + break; // Keep searching in other files + case kFound: + return s; + case kDeleted: + s = Status::NotFound(Slice()); // Use empty error message for speed + return s; + case kCorrupt: + s = Status::Corruption("corrupted key for ", user_key); return s; - } } } } |