summaryrefslogtreecommitdiff
path: root/db/db_iter.cc
diff options
context:
space:
mode:
authorjorlow@chromium.org <jorlow@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529>2011-03-18 22:37:00 +0000
committerjorlow@chromium.org <jorlow@chromium.org@62dab493-f737-651d-591e-8d6aee1b9529>2011-03-18 22:37:00 +0000
commitf67e15e50f392625b4097caf22e8be1b0fe96013 (patch)
tree1cb1764c7627f9bac27ed0e0abf27010156e5007 /db/db_iter.cc
parent54f1fd7eef101db1dfb2bb66a59083c45a38aa4a (diff)
downloadleveldb-f67e15e50f392625b4097caf22e8be1b0fe96013.tar.gz
Initial checkin.
git-svn-id: https://leveldb.googlecode.com/svn/trunk@2 62dab493-f737-651d-591e-8d6aee1b9529
Diffstat (limited to 'db/db_iter.cc')
-rw-r--r--db/db_iter.cc412
1 files changed, 412 insertions, 0 deletions
diff --git a/db/db_iter.cc b/db/db_iter.cc
new file mode 100644
index 0000000..c23de22
--- /dev/null
+++ b/db/db_iter.cc
@@ -0,0 +1,412 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/db_iter.h"
+
+#include "db/filename.h"
+#include "db/dbformat.h"
+#include "include/env.h"
+#include "include/iterator.h"
+#include "port/port.h"
+#include "util/logging.h"
+#include "util/mutexlock.h"
+
+namespace leveldb {
+
+#if 0
+static void DumpInternalIter(Iterator* iter) {
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ParsedInternalKey k;
+ if (!ParseInternalKey(iter->key(), &k)) {
+ fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
+ } else {
+ fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
+ }
+ }
+}
+#endif
+
+namespace {
+
+// Memtables and sstables that make the DB representation contain
+// (userkey,seq,type) => uservalue entries. DBIter
+// combines multiple entries for the same userkey found in the DB
+// representation into a single entry while accounting for sequence
+// numbers, deletion markers, overwrites, etc.
+class DBIter: public Iterator {
+ public:
+ DBIter(const std::string* dbname, Env* env,
+ const Comparator* cmp, Iterator* iter, SequenceNumber s)
+ : dbname_(dbname),
+ env_(env),
+ user_comparator_(cmp),
+ iter_(iter),
+ sequence_(s),
+ large_(NULL),
+ valid_(false) {
+ }
+ virtual ~DBIter() {
+ delete iter_;
+ delete large_;
+ }
+ virtual bool Valid() const { return valid_; }
+ virtual Slice key() const {
+ assert(valid_);
+ return key_;
+ }
+ virtual Slice value() const {
+ assert(valid_);
+ if (large_ == NULL) {
+ return value_;
+ } else {
+ MutexLock l(&large_->mutex);
+ if (!large_->produced) {
+ ReadIndirectValue();
+ }
+ return large_->value;
+ }
+ }
+
+ virtual void Next() {
+ assert(valid_);
+ // iter_ is already positioned past DBIter::key()
+ FindNextUserEntry();
+ }
+
+ virtual void Prev() {
+ assert(valid_);
+ bool ignored;
+ ScanUntilBeforeCurrentKey(&ignored);
+ FindPrevUserEntry();
+ }
+
+ virtual void Seek(const Slice& target) {
+ ParsedInternalKey ikey(target, sequence_, kValueTypeForSeek);
+ std::string tmp;
+ AppendInternalKey(&tmp, ikey);
+ iter_->Seek(tmp);
+ FindNextUserEntry();
+ }
+ virtual void SeekToFirst() {
+ iter_->SeekToFirst();
+ FindNextUserEntry();
+ }
+
+ virtual void SeekToLast();
+
+ virtual Status status() const {
+ if (status_.ok()) {
+ if (large_ != NULL && !large_->status.ok()) return large_->status;
+ return iter_->status();
+ } else {
+ return status_;
+ }
+ }
+
+ private:
+ void FindNextUserEntry();
+ void FindPrevUserEntry();
+ void SaveKey(const Slice& k) { key_.assign(k.data(), k.size()); }
+ void SaveValue(const Slice& v) {
+ if (value_.capacity() > v.size() + 1048576) {
+ std::string empty;
+ swap(empty, value_);
+ }
+ value_.assign(v.data(), v.size());
+ }
+ bool ParseKey(ParsedInternalKey* key);
+ void SkipPast(const Slice& k);
+ void ScanUntilBeforeCurrentKey(bool* found_live);
+
+ void ReadIndirectValue() const;
+
+ struct Large {
+ port::Mutex mutex;
+ std::string value;
+ bool produced;
+ Status status;
+ };
+
+ const std::string* const dbname_;
+ Env* const env_;
+
+ const Comparator* const user_comparator_;
+
+ // iter_ is positioned just past current entry for DBIter if valid_
+ Iterator* const iter_;
+
+ SequenceNumber const sequence_;
+ Status status_;
+ std::string key_; // Always a user key
+ std::string value_;
+ Large* large_; // Non-NULL if value is an indirect reference
+ bool valid_;
+
+ // No copying allowed
+ DBIter(const DBIter&);
+ void operator=(const DBIter&);
+};
+
+inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
+ if (!ParseInternalKey(iter_->key(), ikey)) {
+ status_ = Status::Corruption("corrupted internal key in DBIter");
+ return false;
+ } else {
+ return true;
+ }
+}
+
+void DBIter::FindNextUserEntry() {
+ if (large_ != NULL) {
+ if (status_.ok() && !large_->status.ok()) {
+ status_ = large_->status;
+ }
+ delete large_;
+ large_ = NULL;
+ }
+ while (iter_->Valid()) {
+ ParsedInternalKey ikey;
+ if (!ParseKey(&ikey)) {
+ // Skip past corrupted entry
+ iter_->Next();
+ continue;
+ }
+ if (ikey.sequence > sequence_) {
+ // Ignore entries newer than the snapshot
+ iter_->Next();
+ continue;
+ }
+
+ switch (ikey.type) {
+ case kTypeDeletion:
+ SaveKey(ikey.user_key); // Make local copy for use by SkipPast()
+ iter_->Next();
+ SkipPast(key_);
+ // Do not return deleted entries. Instead keep looping.
+ break;
+
+ case kTypeValue:
+ SaveKey(ikey.user_key);
+ SaveValue(iter_->value());
+ iter_->Next();
+ SkipPast(key_);
+ // Yield the value we just found.
+ valid_ = true;
+ return;
+
+ case kTypeLargeValueRef:
+ SaveKey(ikey.user_key);
+ // Save the large value ref as value_, and read it lazily on a call
+ // to value()
+ SaveValue(iter_->value());
+ large_ = new Large;
+ large_->produced = false;
+ iter_->Next();
+ SkipPast(key_);
+ // Yield the value we just found.
+ valid_ = true;
+ return;
+ }
+ }
+ valid_ = false;
+ key_.clear();
+ value_.clear();
+ assert(large_ == NULL);
+}
+
+void DBIter::SkipPast(const Slice& k) {
+ while (iter_->Valid()) {
+ ParsedInternalKey ikey;
+ // Note that if we cannot parse an internal key, we keep looping
+ // so that if we have a run like the following:
+ // <x,100,v> => value100
+ // <corrupted entry for user key x>
+ // <x,50,v> => value50
+ // we will skip over the corrupted entry as well as value50.
+ if (ParseKey(&ikey) && user_comparator_->Compare(ikey.user_key, k) != 0) {
+ break;
+ }
+ iter_->Next();
+ }
+}
+
+void DBIter::SeekToLast() {
+ // Position iter_ at the last uncorrupted user key and then
+ // let FindPrevUserEntry() do the heavy lifting to find
+ // a user key that is live.
+ iter_->SeekToLast();
+ ParsedInternalKey current;
+ while (iter_->Valid() && !ParseKey(&current)) {
+ iter_->Prev();
+ }
+ if (iter_->Valid()) {
+ SaveKey(current.user_key);
+ }
+ FindPrevUserEntry();
+}
+
+// Let X be the user key at which iter_ is currently positioned.
+// Adjust DBIter to point at the last entry with a key <= X that
+// has a live value.
+void DBIter::FindPrevUserEntry() {
+ // Consider the following example:
+ //
+ // A@540
+ // A@400
+ //
+ // B@300
+ // B@200
+ // B@100 <- iter_
+ //
+ // C@301
+ // C@201
+ //
+ // The comments marked "(first iteration)" below relate what happens
+ // for the preceding example in the first iteration of the while loop
+ // below. There may be more than one iteration either if there are
+ // no live values for B, or if there is a corruption.
+ while (iter_->Valid()) {
+ std::string saved = key_;
+ bool found_live;
+ ScanUntilBeforeCurrentKey(&found_live);
+ // (first iteration) iter_ at A@400
+ if (found_live) {
+ // Step forward into range of entries with user key >= saved
+ if (!iter_->Valid()) {
+ iter_->SeekToFirst();
+ } else {
+ iter_->Next();
+ }
+ // (first iteration) iter_ at B@300
+
+ FindNextUserEntry(); // Sets key_ to the key of the next value it found
+ if (valid_ && user_comparator_->Compare(key_, saved) == 0) {
+ // (first iteration) iter_ at C@301
+ return;
+ }
+
+ // FindNextUserEntry() could not find any entries under the
+ // user key "saved". This is probably a corruption since
+ // ScanUntilBefore(saved) found a live value. So we skip
+ // backwards to an earlier key and ignore the corrupted
+ // entries for "saved".
+ //
+ // (first iteration) iter_ at C@301 and saved == "B"
+ key_ = saved;
+ bool ignored;
+ ScanUntilBeforeCurrentKey(&ignored);
+ // (first iteration) iter_ at A@400
+ }
+ }
+ valid_ = false;
+ key_.clear();
+ value_.clear();
+}
+
+void DBIter::ScanUntilBeforeCurrentKey(bool* found_live) {
+ *found_live = false;
+ if (!iter_->Valid()) {
+ iter_->SeekToLast();
+ }
+
+ while (iter_->Valid()) {
+ ParsedInternalKey current;
+ if (!ParseKey(&current)) {
+ iter_->Prev();
+ continue;
+ }
+
+ if (current.sequence > sequence_) {
+ // Ignore entries that are serialized after this read
+ iter_->Prev();
+ continue;
+ }
+
+ const int cmp = user_comparator_->Compare(current.user_key, key_);
+ if (cmp < 0) {
+ SaveKey(current.user_key);
+ return;
+ } else if (cmp == 0) {
+ switch (current.type) {
+ case kTypeDeletion:
+ *found_live = false;
+ break;
+
+ case kTypeValue:
+ case kTypeLargeValueRef:
+ *found_live = true;
+ break;
+ }
+ } else { // cmp > 0
+ *found_live = false;
+ }
+
+ iter_->Prev();
+ }
+}
+
+void DBIter::ReadIndirectValue() const {
+ assert(!large_->produced);
+ large_->produced = true;
+ LargeValueRef large_ref;
+ if (value_.size() != LargeValueRef::ByteSize()) {
+ large_->status = Status::Corruption("malformed large value reference");
+ return;
+ }
+ memcpy(large_ref.data, value_.data(), LargeValueRef::ByteSize());
+ std::string fname = LargeValueFileName(*dbname_, large_ref);
+ RandomAccessFile* file;
+ Status s = env_->NewRandomAccessFile(fname, &file);
+ if (s.ok()) {
+ uint64_t file_size = file->Size();
+ uint64_t value_size = large_ref.ValueSize();
+ large_->value.resize(value_size);
+ Slice result;
+ s = file->Read(0, file_size, &result,
+ const_cast<char*>(large_->value.data()));
+ if (s.ok()) {
+ if (result.size() == file_size) {
+ switch (large_ref.compression_type()) {
+ case kNoCompression: {
+ if (result.data() != large_->value.data()) {
+ large_->value.assign(result.data(), result.size());
+ }
+ break;
+ }
+ case kLightweightCompression: {
+ std::string uncompressed;
+ if (port::Lightweight_Uncompress(result.data(), result.size(),
+ &uncompressed) &&
+ uncompressed.size() == large_ref.ValueSize()) {
+ swap(uncompressed, large_->value);
+ } else {
+ s = Status::Corruption(
+ "Unable to read entire compressed large value file");
+ }
+ }
+ }
+ } else {
+ s = Status::Corruption("Unable to read entire large value file");
+ }
+ }
+ delete file; // Ignore errors on closing
+ }
+ if (!s.ok()) {
+ large_->value.clear();
+ large_->status = s;
+ }
+}
+
+} // anonymous namespace
+
+Iterator* NewDBIterator(
+ const std::string* dbname,
+ Env* env,
+ const Comparator* user_key_comparator,
+ Iterator* internal_iter,
+ const SequenceNumber& sequence) {
+ return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
+}
+
+}