diff options
author | jorlow@chromium.org <jorlow@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529> | 2011-03-18 22:37:00 +0000 |
---|---|---|
committer | jorlow@chromium.org <jorlow@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529> | 2011-03-18 22:37:00 +0000 |
commit | f67e15e50f392625b4097caf22e8be1b0fe96013 (patch) | |
tree | 1cb1764c7627f9bac27ed0e0abf27010156e5007 /db/db_iter.cc | |
parent | 54f1fd7eef101db1dfb2bb66a59083c45a38aa4a (diff) | |
download | leveldb-f67e15e50f392625b4097caf22e8be1b0fe96013.tar.gz |
Initial checkin.
git-svn-id: https://leveldb.googlecode.com/svn/trunk@2 62dab493-f737-651d-591e-8d6aee1b9529
Diffstat (limited to 'db/db_iter.cc')
-rw-r--r-- | db/db_iter.cc | 412 |
1 files changed, 412 insertions, 0 deletions
diff --git a/db/db_iter.cc b/db/db_iter.cc new file mode 100644 index 0000000..c23de22 --- /dev/null +++ b/db/db_iter.cc @@ -0,0 +1,412 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_iter.h" + +#include "db/filename.h" +#include "db/dbformat.h" +#include "include/env.h" +#include "include/iterator.h" +#include "port/port.h" +#include "util/logging.h" +#include "util/mutexlock.h" + +namespace leveldb { + +#if 0 +static void DumpInternalIter(Iterator* iter) { + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ParsedInternalKey k; + if (!ParseInternalKey(iter->key(), &k)) { + fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str()); + } else { + fprintf(stderr, "@ '%s'\n", k.DebugString().c_str()); + } + } +} +#endif + +namespace { + +// Memtables and sstables that make the DB representation contain +// (userkey,seq,type) => uservalue entries. DBIter +// combines multiple entries for the same userkey found in the DB +// representation into a single entry while accounting for sequence +// numbers, deletion markers, overwrites, etc. +class DBIter: public Iterator { + public: + DBIter(const std::string* dbname, Env* env, + const Comparator* cmp, Iterator* iter, SequenceNumber s) + : dbname_(dbname), + env_(env), + user_comparator_(cmp), + iter_(iter), + sequence_(s), + large_(NULL), + valid_(false) { + } + virtual ~DBIter() { + delete iter_; + delete large_; + } + virtual bool Valid() const { return valid_; } + virtual Slice key() const { + assert(valid_); + return key_; + } + virtual Slice value() const { + assert(valid_); + if (large_ == NULL) { + return value_; + } else { + MutexLock l(&large_->mutex); + if (!large_->produced) { + ReadIndirectValue(); + } + return large_->value; + } + } + + virtual void Next() { + assert(valid_); + // iter_ is already positioned past DBIter::key() + FindNextUserEntry(); + } + + virtual void Prev() { + assert(valid_); + bool ignored; + ScanUntilBeforeCurrentKey(&ignored); + FindPrevUserEntry(); + } + + virtual void Seek(const Slice& target) { + ParsedInternalKey ikey(target, sequence_, kValueTypeForSeek); + std::string tmp; + AppendInternalKey(&tmp, ikey); + iter_->Seek(tmp); + FindNextUserEntry(); + } + virtual void SeekToFirst() { + iter_->SeekToFirst(); + FindNextUserEntry(); + } + + virtual void SeekToLast(); + + virtual Status status() const { + if (status_.ok()) { + if (large_ != NULL && !large_->status.ok()) return large_->status; + return iter_->status(); + } else { + return status_; + } + } + + private: + void FindNextUserEntry(); + void FindPrevUserEntry(); + void SaveKey(const Slice& k) { key_.assign(k.data(), k.size()); } + void SaveValue(const Slice& v) { + if (value_.capacity() > v.size() + 1048576) { + std::string empty; + swap(empty, value_); + } + value_.assign(v.data(), v.size()); + } + bool ParseKey(ParsedInternalKey* key); + void SkipPast(const Slice& k); + void ScanUntilBeforeCurrentKey(bool* found_live); + + void ReadIndirectValue() const; + + struct Large { + port::Mutex mutex; + std::string value; + bool produced; + Status status; + }; + + const std::string* const dbname_; + Env* const env_; + + const Comparator* const user_comparator_; + + // iter_ is positioned just past current entry for DBIter if valid_ + Iterator* const iter_; + + SequenceNumber const sequence_; + Status status_; + std::string key_; // Always a user key + std::string value_; + Large* large_; // Non-NULL if value is an indirect reference + bool valid_; + + // No copying allowed + DBIter(const DBIter&); + void operator=(const DBIter&); +}; + +inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { + if (!ParseInternalKey(iter_->key(), ikey)) { + status_ = Status::Corruption("corrupted internal key in DBIter"); + return false; + } else { + return true; + } +} + +void DBIter::FindNextUserEntry() { + if (large_ != NULL) { + if (status_.ok() && !large_->status.ok()) { + status_ = large_->status; + } + delete large_; + large_ = NULL; + } + while (iter_->Valid()) { + ParsedInternalKey ikey; + if (!ParseKey(&ikey)) { + // Skip past corrupted entry + iter_->Next(); + continue; + } + if (ikey.sequence > sequence_) { + // Ignore entries newer than the snapshot + iter_->Next(); + continue; + } + + switch (ikey.type) { + case kTypeDeletion: + SaveKey(ikey.user_key); // Make local copy for use by SkipPast() + iter_->Next(); + SkipPast(key_); + // Do not return deleted entries. Instead keep looping. + break; + + case kTypeValue: + SaveKey(ikey.user_key); + SaveValue(iter_->value()); + iter_->Next(); + SkipPast(key_); + // Yield the value we just found. + valid_ = true; + return; + + case kTypeLargeValueRef: + SaveKey(ikey.user_key); + // Save the large value ref as value_, and read it lazily on a call + // to value() + SaveValue(iter_->value()); + large_ = new Large; + large_->produced = false; + iter_->Next(); + SkipPast(key_); + // Yield the value we just found. + valid_ = true; + return; + } + } + valid_ = false; + key_.clear(); + value_.clear(); + assert(large_ == NULL); +} + +void DBIter::SkipPast(const Slice& k) { + while (iter_->Valid()) { + ParsedInternalKey ikey; + // Note that if we cannot parse an internal key, we keep looping + // so that if we have a run like the following: + // <x,100,v> => value100 + // <corrupted entry for user key x> + // <x,50,v> => value50 + // we will skip over the corrupted entry as well as value50. + if (ParseKey(&ikey) && user_comparator_->Compare(ikey.user_key, k) != 0) { + break; + } + iter_->Next(); + } +} + +void DBIter::SeekToLast() { + // Position iter_ at the last uncorrupted user key and then + // let FindPrevUserEntry() do the heavy lifting to find + // a user key that is live. + iter_->SeekToLast(); + ParsedInternalKey current; + while (iter_->Valid() && !ParseKey(¤t)) { + iter_->Prev(); + } + if (iter_->Valid()) { + SaveKey(current.user_key); + } + FindPrevUserEntry(); +} + +// Let X be the user key at which iter_ is currently positioned. +// Adjust DBIter to point at the last entry with a key <= X that +// has a live value. +void DBIter::FindPrevUserEntry() { + // Consider the following example: + // + // A@540 + // A@400 + // + // B@300 + // B@200 + // B@100 <- iter_ + // + // C@301 + // C@201 + // + // The comments marked "(first iteration)" below relate what happens + // for the preceding example in the first iteration of the while loop + // below. There may be more than one iteration either if there are + // no live values for B, or if there is a corruption. + while (iter_->Valid()) { + std::string saved = key_; + bool found_live; + ScanUntilBeforeCurrentKey(&found_live); + // (first iteration) iter_ at A@400 + if (found_live) { + // Step forward into range of entries with user key >= saved + if (!iter_->Valid()) { + iter_->SeekToFirst(); + } else { + iter_->Next(); + } + // (first iteration) iter_ at B@300 + + FindNextUserEntry(); // Sets key_ to the key of the next value it found + if (valid_ && user_comparator_->Compare(key_, saved) == 0) { + // (first iteration) iter_ at C@301 + return; + } + + // FindNextUserEntry() could not find any entries under the + // user key "saved". This is probably a corruption since + // ScanUntilBefore(saved) found a live value. So we skip + // backwards to an earlier key and ignore the corrupted + // entries for "saved". + // + // (first iteration) iter_ at C@301 and saved == "B" + key_ = saved; + bool ignored; + ScanUntilBeforeCurrentKey(&ignored); + // (first iteration) iter_ at A@400 + } + } + valid_ = false; + key_.clear(); + value_.clear(); +} + +void DBIter::ScanUntilBeforeCurrentKey(bool* found_live) { + *found_live = false; + if (!iter_->Valid()) { + iter_->SeekToLast(); + } + + while (iter_->Valid()) { + ParsedInternalKey current; + if (!ParseKey(¤t)) { + iter_->Prev(); + continue; + } + + if (current.sequence > sequence_) { + // Ignore entries that are serialized after this read + iter_->Prev(); + continue; + } + + const int cmp = user_comparator_->Compare(current.user_key, key_); + if (cmp < 0) { + SaveKey(current.user_key); + return; + } else if (cmp == 0) { + switch (current.type) { + case kTypeDeletion: + *found_live = false; + break; + + case kTypeValue: + case kTypeLargeValueRef: + *found_live = true; + break; + } + } else { // cmp > 0 + *found_live = false; + } + + iter_->Prev(); + } +} + +void DBIter::ReadIndirectValue() const { + assert(!large_->produced); + large_->produced = true; + LargeValueRef large_ref; + if (value_.size() != LargeValueRef::ByteSize()) { + large_->status = Status::Corruption("malformed large value reference"); + return; + } + memcpy(large_ref.data, value_.data(), LargeValueRef::ByteSize()); + std::string fname = LargeValueFileName(*dbname_, large_ref); + RandomAccessFile* file; + Status s = env_->NewRandomAccessFile(fname, &file); + if (s.ok()) { + uint64_t file_size = file->Size(); + uint64_t value_size = large_ref.ValueSize(); + large_->value.resize(value_size); + Slice result; + s = file->Read(0, file_size, &result, + const_cast<char*>(large_->value.data())); + if (s.ok()) { + if (result.size() == file_size) { + switch (large_ref.compression_type()) { + case kNoCompression: { + if (result.data() != large_->value.data()) { + large_->value.assign(result.data(), result.size()); + } + break; + } + case kLightweightCompression: { + std::string uncompressed; + if (port::Lightweight_Uncompress(result.data(), result.size(), + &uncompressed) && + uncompressed.size() == large_ref.ValueSize()) { + swap(uncompressed, large_->value); + } else { + s = Status::Corruption( + "Unable to read entire compressed large value file"); + } + } + } + } else { + s = Status::Corruption("Unable to read entire large value file"); + } + } + delete file; // Ignore errors on closing + } + if (!s.ok()) { + large_->value.clear(); + large_->status = s; + } +} + +} // anonymous namespace + +Iterator* NewDBIterator( + const std::string* dbname, + Env* env, + const Comparator* user_key_comparator, + Iterator* internal_iter, + const SequenceNumber& sequence) { + return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); +} + +} |