summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgabor@google.com <gabor@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2011-06-22 02:36:45 +0000
committergabor@google.com <gabor@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2011-06-22 02:36:45 +0000
commitccf0fcd5c2946f9228068d657a56d91af9671575 (patch)
tree876b8a3e734972320aced5f0b33bf8bc34c5f101
parent80e5b0d944b7651046f8b0e795065eca02a01e59 (diff)
downloadleveldb-ccf0fcd5c2946f9228068d657a56d91af9671575.tar.gz
A number of smaller fixes and performance improvements:
- Implemented Get() directly instead of building on top of a full merging iterator stack. This speeds up the "readrandom" benchmark by up to 15-30%. - Fixed an opensource compilation problem. Added --db=<name> flag to control where the database is placed. - Automatically compact a file when we have done enough overlapping seeks to that file. - Fixed a performance bug where we would read from at least one file in a level even if none of the files overlapped the key being read. - Makefile fix for Mac OSX installations that have XCode 4 without XCode 3. - Unified the two occurrences of binary search in a file-list into one routine. - Found and fixed a bug where we would unnecessarily search the last file when looking for a key larger than all data in the level. - A fix to avoid the need for trivial move compactions and therefore gets rid of two out of five syncs in "fillseq". - Removed the MANIFEST file write when switching to a new memtable/log-file for a 10-20% improvement on fill speed on ext4. - Adding a SNAPPY setting in the Makefile for folks who have Snappy installed. Snappy compresses values and speeds up writes. git-svn-id: https://leveldb.googlecode.com/svn/trunk@32 62dab493-f737-651d-591e-8d6aee1b9529
-rw-r--r--Makefile28
-rw-r--r--TODO7
-rw-r--r--db/builder.cc6
-rw-r--r--db/builder.h10
-rw-r--r--db/corruption_test.cc31
-rw-r--r--db/db_bench.cc19
-rw-r--r--db/db_impl.cc135
-rw-r--r--db/db_impl.h3
-rw-r--r--db/db_test.cc219
-rw-r--r--db/dbformat.cc19
-rw-r--r--db/dbformat.h40
-rw-r--r--db/memtable.cc37
-rw-r--r--db/memtable.h6
-rw-r--r--db/repair.cc6
-rw-r--r--db/version_edit.h3
-rw-r--r--db/version_set.cc281
-rw-r--r--db/version_set.h49
-rw-r--r--port/port_posix.h22
-rw-r--r--table/table_test.cc14
19 files changed, 783 insertions, 152 deletions
diff --git a/Makefile b/Makefile
index 921b71c..84f77ab 100644
--- a/Makefile
+++ b/Makefile
@@ -28,9 +28,22 @@ PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -std=c++0x
PORT_MODULE = port_posix.o
endif # UNAME
-CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT)
+# Set 'SNAPPY' to 1 if you have the Snappy compression library
+# installed and want to enable its use in LevelDB
+# (see http://code.google.com/p/snappy/)
+SNAPPY=0
+
+ifeq ($(SNAPPY), 0)
+SNAPPY_CFLAGS=
+SNAPPY_LDFLAGS=
+else
+SNAPPY_CFLAGS=-DSNAPPY
+SNAPPY_LDFLAGS=-lsnappy
+endif
-LDFLAGS=-lpthread
+CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT) $(SNAPPY_CFLAGS)
+
+LDFLAGS=-lpthread $(SNAPPY_LDFLAGS)
LIBOBJECTS = \
./db/builder.o \
@@ -85,6 +98,7 @@ TESTS = \
skiplist_test \
table_test \
version_edit_test \
+ version_set_test \
write_batch_test
PROGRAMS = db_bench $(TESTS)
@@ -151,17 +165,23 @@ skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
+version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
+ $(CC) $(LDFLAGS) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
+
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and
# a device.
+SIMULATORROOT=/Developer/Platforms/iPhoneSimulator.platform/Developer
+DEVICEROOT=/Developer/Platforms/iPhoneOS.platform/Developer
+IOSVERSION=$(shell defaults read /Developer/Platforms/iPhoneOS.platform/version CFBundleShortVersionString)
.cc.o:
mkdir -p ios-x86/$(dir $@)
- $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator4.3.sdk -arch i686 $< -o ios-x86/$@
+ $(SIMULATORROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
- $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS4.3.sdk -arch armv6 -arch armv7 $< -o ios-arm/$@
+ $(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
else
.cc.o:
diff --git a/TODO b/TODO
index ce81439..9130b6a 100644
--- a/TODO
+++ b/TODO
@@ -8,7 +8,6 @@ db
object stores, etc. can be done in the background anyway, so
probably not that important.
-api changes:
-- Make it wrappable
-
-Faster Get implementation
+After a range is completely deleted, what gets rid of the
+corresponding files if we do no future changes to that range. Make
+the conditions for triggering compactions fire in more situations?
diff --git a/db/builder.cc b/db/builder.cc
index 9f132d7..34a7b87 100644
--- a/db/builder.cc
+++ b/db/builder.cc
@@ -19,8 +19,7 @@ Status BuildTable(const std::string& dbname,
const Options& options,
TableCache* table_cache,
Iterator* iter,
- FileMetaData* meta,
- VersionEdit* edit) {
+ FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();
@@ -79,8 +78,7 @@ Status BuildTable(const std::string& dbname,
}
if (s.ok() && meta->file_size > 0) {
- edit->AddFile(0, meta->number, meta->file_size,
- meta->smallest, meta->largest);
+ // Keep it
} else {
env->DeleteFile(fname);
}
diff --git a/db/builder.h b/db/builder.h
index 5dd17b6..b2aeabf 100644
--- a/db/builder.h
+++ b/db/builder.h
@@ -19,17 +19,15 @@ class VersionEdit;
// Build a Table file from the contents of *iter. The generated file
// will be named according to meta->number. On success, the rest of
-// *meta will be filled with metadata about the generated table, and
-// the file information will be added to *edit. If no data is present
-// in *iter, meta->file_size will be set to zero, and no Table file
-// will be produced.
+// *meta will be filled with metadata about the generated table.
+// If no data is present in *iter, meta->file_size will be set to
+// zero, and no Table file will be produced.
extern Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
TableCache* table_cache,
Iterator* iter,
- FileMetaData* meta,
- VersionEdit* edit);
+ FileMetaData* meta);
}
diff --git a/db/corruption_test.cc b/db/corruption_test.cc
index 12d176e..8015101 100644
--- a/db/corruption_test.cc
+++ b/db/corruption_test.cc
@@ -27,13 +27,12 @@ static const int kValueSize = 1000;
class CorruptionTest {
public:
test::ErrorEnv env_;
- Random rnd_;
std::string dbname_;
Cache* tiny_cache_;
Options options_;
DB* db_;
- CorruptionTest() : rnd_(test::RandomSeed()) {
+ CorruptionTest() {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_;
dbname_ = test::TmpDir() + "/db_test";
@@ -122,15 +121,17 @@ class CorruptionTest {
ASSERT_OK(env_.GetChildren(dbname_, &filenames));
uint64_t number;
FileType type;
- std::vector<std::string> candidates;
+ std::string fname;
+ int picked_number = -1;
for (int i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
- type == filetype) {
- candidates.push_back(dbname_ + "/" + filenames[i]);
+ type == filetype &&
+ int(number) > picked_number) { // Pick latest file
+ fname = dbname_ + "/" + filenames[i];
+ picked_number = number;
}
}
- ASSERT_TRUE(!candidates.empty()) << filetype;
- std::string fname = candidates[rnd_.Uniform(candidates.size())];
+ ASSERT_TRUE(!fname.empty()) << filetype;
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
@@ -239,8 +240,6 @@ TEST(CorruptionTest, TableFileIndexData) {
Build(10000); // Enough to build multiple Tables
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
- dbi->TEST_CompactRange(0, "", "~");
- dbi->TEST_CompactRange(1, "", "~");
Corrupt(kTableFile, -2000, 500);
Reopen();
@@ -296,7 +295,8 @@ TEST(CorruptionTest, CompactionInputError) {
Build(10);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
- ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
+ const int last = config::kNumLevels - 1;
+ ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
Corrupt(kTableFile, 100, 1);
Check(9, 9);
@@ -304,8 +304,6 @@ TEST(CorruptionTest, CompactionInputError) {
// Force compactions by writing lots of values
Build(10000);
Check(10000, 10000);
- dbi->TEST_CompactRange(0, "", "~");
- ASSERT_EQ(0, Property("leveldb.num-files-at-level0"));
}
TEST(CorruptionTest, CompactionInputErrorParanoid) {
@@ -313,9 +311,16 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) {
options.paranoid_checks = true;
options.write_buffer_size = 1048576;
Reopen(&options);
+ DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
+
+ // Fill levels >= 1 so memtable compaction outputs to level 1
+ for (int level = 1; level < config::kNumLevels; level++) {
+ dbi->Put(WriteOptions(), "", "begin");
+ dbi->Put(WriteOptions(), "~", "end");
+ dbi->TEST_CompactMemTable();
+ }
Build(10);
- DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
diff --git a/db/db_bench.cc b/db/db_bench.cc
index b24179d..53b8c53 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -86,6 +86,9 @@ static int FLAGS_open_files = 0;
// benchmark will fail.
static bool FLAGS_use_existing_db = false;
+// Use the db with the following name.
+static const char* FLAGS_db = "/tmp/dbbench";
+
namespace leveldb {
// Helper for quickly generating random data.
@@ -318,14 +321,14 @@ class Benchmark {
bytes_(0),
rand_(301) {
std::vector<std::string> files;
- Env::Default()->GetChildren("/tmp/dbbench", &files);
+ Env::Default()->GetChildren(FLAGS_db, &files);
for (int i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) {
- Env::Default()->DeleteFile("/tmp/dbbench/" + files[i]);
+ Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
}
}
if (!FLAGS_use_existing_db) {
- DestroyDB("/tmp/dbbench", Options());
+ DestroyDB(FLAGS_db, Options());
}
}
@@ -364,7 +367,7 @@ class Benchmark {
Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1);
} else if (name == Slice("fillsync")) {
write_options.sync = true;
- Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size, 1);
+ Write(write_options, RANDOM, FRESH, num_ / 1000, FLAGS_value_size, 1);
} else if (name == Slice("fill100K")) {
Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1);
} else if (name == Slice("readseq")) {
@@ -490,7 +493,7 @@ class Benchmark {
options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
- Status s = DB::Open(options, "/tmp/dbbench", &db_);
+ Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
@@ -506,7 +509,7 @@ class Benchmark {
}
delete db_;
db_ = NULL;
- DestroyDB("/tmp/dbbench", Options());
+ DestroyDB(FLAGS_db, Options());
Open();
Start(); // Do not count time taken to destroy/open
}
@@ -617,7 +620,7 @@ class Benchmark {
void HeapProfile() {
char fname[100];
- snprintf(fname, sizeof(fname), "/tmp/dbbench/heap-%04d", ++heap_counter_);
+ snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file);
if (!s.ok()) {
@@ -665,6 +668,8 @@ int main(int argc, char** argv) {
FLAGS_cache_size = n;
} else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
FLAGS_open_files = n;
+ } else if (strncmp(argv[i], "--db=", 5) == 0) {
+ FLAGS_db = argv[i] + 5;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
diff --git a/db/db_impl.cc b/db/db_impl.cc
index abcc761..7556d5a 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -122,6 +122,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mem_(new MemTable(internal_comparator_)),
imm_(NULL),
logfile_(NULL),
+ logfile_number_(0),
log_(NULL),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
@@ -219,7 +220,7 @@ void DBImpl::DeleteObsoleteFiles() {
bool keep = true;
switch (type) {
case kLogFile:
- keep = ((number == versions_->LogNumber()) ||
+ keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break;
case kDescriptorFile:
@@ -287,14 +288,39 @@ Status DBImpl::Recover(VersionEdit* edit) {
s = versions_->Recover();
if (s.ok()) {
- // Recover from the log files named in the descriptor
SequenceNumber max_sequence(0);
- if (versions_->PrevLogNumber() != 0) { // log#==0 means no prev log
- s = RecoverLogFile(versions_->PrevLogNumber(), edit, &max_sequence);
+
+ // Recover from all newer log files than the ones named in the
+ // descriptor (new log files may have been added by the previous
+ // incarnation without registering them in the descriptor).
+ //
+ // Note that PrevLogNumber() is no longer used, but we pay
+ // attention to it in case we are recovering a database
+ // produced by an older version of leveldb.
+ const uint64_t min_log = versions_->LogNumber();
+ const uint64_t prev_log = versions_->PrevLogNumber();
+ std::vector<std::string> filenames;
+ s = env_->GetChildren(dbname_, &filenames);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t number;
+ FileType type;
+ std::vector<uint64_t> logs;
+ for (size_t i = 0; i < filenames.size(); i++) {
+ if (ParseFileName(filenames[i], &number, &type)
+ && type == kLogFile
+ && ((number >= min_log) || (number == prev_log))) {
+ logs.push_back(number);
+ }
}
- if (s.ok() && versions_->LogNumber() != 0) { // log#==0 for initial state
- s = RecoverLogFile(versions_->LogNumber(), edit, &max_sequence);
+
+ // Recover in the order in which the logs were generated
+ std::sort(logs.begin(), logs.end());
+ for (size_t i = 0; i < logs.size(); i++) {
+ s = RecoverLogFile(logs[i], edit, &max_sequence);
}
+
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
@@ -378,7 +404,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
- status = WriteLevel0Table(mem, edit);
+ status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
@@ -390,7 +416,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
if (status.ok() && mem != NULL) {
- status = WriteLevel0Table(mem, edit);
+ status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
}
@@ -400,7 +426,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
return status;
}
-Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
+Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
+ Version* base) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
@@ -413,7 +440,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
Status s;
{
mutex_.Unlock();
- s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, edit);
+ s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
@@ -424,10 +451,26 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
delete iter;
pending_outputs_.erase(meta.number);
+
+ // Note that if file_size is zero, the file has been deleted and
+ // should not be added to the manifest.
+ int level = 0;
+ if (s.ok() && meta.file_size > 0) {
+ if (base != NULL && !base->OverlapInLevel(0, meta.smallest, meta.largest)) {
+ // Push to largest level we can without causing overlaps
+ while (level + 1 < config::kNumLevels &&
+ !base->OverlapInLevel(level + 1, meta.smallest, meta.largest)) {
+ level++;
+ }
+ }
+ edit->AddFile(level, meta.number, meta.file_size,
+ meta.smallest, meta.largest);
+ }
+
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
- stats_[0].Add(stats);
+ stats_[level].Add(stats);
return s;
}
@@ -437,11 +480,19 @@ Status DBImpl::CompactMemTable() {
// Save the contents of the memtable as a new Table
VersionEdit edit;
- Status s = WriteLevel0Table(imm_, &edit);
+ Version* base = versions_->current();
+ base->Ref();
+ Status s = WriteLevel0Table(imm_, &edit, base);
+ base->Unref();
+
+ if (s.ok() && shutting_down_.Acquire_Load()) {
+ s = Status::IOError("Deleting DB during memtable compaction");
+ }
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
+ edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit);
}
@@ -460,6 +511,9 @@ void DBImpl::TEST_CompactRange(
int level,
const std::string& begin,
const std::string& end) {
+ assert(level >= 0);
+ assert(level + 1 < config::kNumLevels);
+
MutexLock l(&mutex_);
while (manual_compaction_ != NULL) {
bg_cv_.Wait();
@@ -934,22 +988,38 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
- // TODO(opt): faster implementation
- Iterator* iter = NewIterator(options);
- iter->Seek(key);
- bool found = false;
- if (iter->Valid() && user_comparator()->Compare(key, iter->key()) == 0) {
- Slice v = iter->value();
- value->assign(v.data(), v.size());
- found = true;
- }
- // Non-OK iterator status trumps everything else
- Status result = iter->status();
- if (result.ok() && !found) {
- result = Status::NotFound(Slice()); // Use an empty error message for speed
+ Status s;
+ MutexLock l(&mutex_);
+ SequenceNumber snapshot;
+ if (options.snapshot != NULL) {
+ snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
+ } else {
+ snapshot = versions_->LastSequence();
}
- delete iter;
- return result;
+
+ // First look in the memtable, then in the immutable memtable (if any).
+ LookupKey lkey(key, snapshot);
+ if (mem_->Get(lkey, value, &s)) {
+ return s;
+ }
+ if (imm_ != NULL && imm_->Get(lkey, value, &s)) {
+ return s;
+ }
+
+ // Not in memtable(s); try live files in level order
+ Version* current = versions_->current();
+ current->Ref();
+ Version::GetStats stats;
+ { // Unlock while reading from files
+ mutex_.Unlock();
+ s = current->Get(options, lkey, value, &stats);
+ mutex_.Lock();
+ }
+ if (current->UpdateStats(stats)) {
+ MaybeScheduleCompaction();
+ }
+ current->Unref();
+ return s;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@@ -1050,18 +1120,10 @@ Status DBImpl::MakeRoomForWrite(bool force) {
if (!s.ok()) {
break;
}
- VersionEdit edit;
- edit.SetPrevLogNumber(versions_->LogNumber());
- edit.SetLogNumber(new_log_number);
- s = versions_->LogAndApply(&edit);
- if (!s.ok()) {
- delete lfile;
- env_->DeleteFile(LogFileName(dbname_, new_log_number));
- break;
- }
delete log_;
delete logfile_;
logfile_ = lfile;
+ logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
@@ -1183,6 +1245,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
+ impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit);
}
diff --git a/db/db_impl.h b/db/db_impl.h
index 84ce154..f11ea55 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -85,7 +85,7 @@ class DBImpl : public DB {
VersionEdit* edit,
SequenceNumber* max_sequence);
- Status WriteLevel0Table(MemTable* mem, VersionEdit* edit);
+ Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
@@ -124,6 +124,7 @@ class DBImpl : public DB {
MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_;
+ uint64_t logfile_number_;
log::Writer* log_;
SnapshotList snapshots_;
diff --git a/db/db_test.cc b/db/db_test.cc
index 42e70cf..d5d60cd 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -21,15 +21,57 @@ static std::string RandomString(Random* rnd, int len) {
return r;
}
+// Special Env used to delay background operations
+class SpecialEnv : public EnvWrapper {
+ public:
+ // sstable Sync() calls are blocked while this pointer is non-NULL.
+ port::AtomicPointer delay_sstable_sync_;
+
+ explicit SpecialEnv(Env* base) : EnvWrapper(base) {
+ delay_sstable_sync_.Release_Store(NULL);
+ }
+
+ Status NewWritableFile(const std::string& f, WritableFile** r) {
+ class SSTableFile : public WritableFile {
+ private:
+ SpecialEnv* env_;
+ WritableFile* base_;
+
+ public:
+ SSTableFile(SpecialEnv* env, WritableFile* base)
+ : env_(env),
+ base_(base) {
+ }
+ Status Append(const Slice& data) { return base_->Append(data); }
+ Status Close() { return base_->Close(); }
+ Status Flush() { return base_->Flush(); }
+ Status Sync() {
+ while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
+ env_->SleepForMicroseconds(100000);
+ }
+ return base_->Sync();
+ }
+ };
+
+ Status s = target()->NewWritableFile(f, r);
+ if (s.ok()) {
+ if (strstr(f.c_str(), ".sst") != NULL) {
+ *r = new SSTableFile(this, *r);
+ }
+ }
+ return s;
+ }
+};
+
class DBTest {
public:
std::string dbname_;
- Env* env_;
+ SpecialEnv* env_;
DB* db_;
Options last_options_;
- DBTest() : env_(Env::Default()) {
+ DBTest() : env_(new SpecialEnv(Env::Default())) {
dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, Options());
db_ = NULL;
@@ -39,6 +81,7 @@ class DBTest {
~DBTest() {
delete db_;
DestroyDB(dbname_, Options());
+ delete env_;
}
DBImpl* dbfull() {
@@ -142,6 +185,14 @@ class DBTest {
return atoi(property.c_str());
}
+ int TotalTableFiles() {
+ int result = 0;
+ for (int level = 0; level < config::kNumLevels; level++) {
+ result += NumTableFilesAtLevel(level);
+ }
+ return result;
+ }
+
uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
@@ -162,6 +213,16 @@ class DBTest {
}
}
+ // Prevent pushing of new sstables into deeper levels by adding
+ // tables that cover a specified range to all levels.
+ void FillLevels(const std::string& smallest, const std::string& largest) {
+ for (int level = 0; level < config::kNumLevels; level++) {
+ Put(smallest, "begin");
+ Put(largest, "end");
+ dbfull()->TEST_CompactMemTable();
+ }
+ }
+
void DumpFileCounts(const char* label) {
fprintf(stderr, "---\n%s:\n", label);
fprintf(stderr, "maxoverlap: %lld\n",
@@ -209,6 +270,80 @@ TEST(DBTest, PutDeleteGet) {
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
+TEST(DBTest, GetFromImmutableLayer) {
+ Options options;
+ options.env = env_;
+ options.write_buffer_size = 100000; // Small write buffer
+ Reopen(&options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_EQ("v1", Get("foo"));
+
+ env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
+ Put("k1", std::string(100000, 'x')); // Fill memtable
+ Put("k2", std::string(100000, 'y')); // Trigger compaction
+ ASSERT_EQ("v1", Get("foo"));
+ env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls
+}
+
+TEST(DBTest, GetFromVersions) {
+ ASSERT_OK(Put("foo", "v1"));
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("v1", Get("foo"));
+}
+
+TEST(DBTest, GetSnapshot) {
+ // Try with both a short key and a long key
+ for (int i = 0; i < 2; i++) {
+ std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
+ ASSERT_OK(Put(key, "v1"));
+ const Snapshot* s1 = db_->GetSnapshot();
+ ASSERT_OK(Put(key, "v2"));
+ ASSERT_EQ("v2", Get(key));
+ ASSERT_EQ("v1", Get(key, s1));
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("v2", Get(key));
+ ASSERT_EQ("v1", Get(key, s1));
+ db_->ReleaseSnapshot(s1);
+ }
+}
+
+TEST(DBTest, GetLevel0Ordering) {
+ // Check that we process level-0 files in correct order. The code
+ // below generates two level-0 files where the earlier one comes
+ // before the later one in the level-0 file list since the earlier
+ // one has a smaller "smallest" key.
+ ASSERT_OK(Put("bar", "b"));
+ ASSERT_OK(Put("foo", "v1"));
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_OK(Put("foo", "v2"));
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("v2", Get("foo"));
+}
+
+TEST(DBTest, GetOrderedByLevels) {
+ ASSERT_OK(Put("foo", "v1"));
+ Compact("a", "z");
+ ASSERT_EQ("v1", Get("foo"));
+ ASSERT_OK(Put("foo", "v2"));
+ ASSERT_EQ("v2", Get("foo"));
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ("v2", Get("foo"));
+}
+
+TEST(DBTest, GetPicksCorrectFile) {
+ // Arrange to have multiple files in a non-level-0 level.
+ ASSERT_OK(Put("a", "va"));
+ Compact("a", "b");
+ ASSERT_OK(Put("x", "vx"));
+ Compact("x", "y");
+ ASSERT_OK(Put("f", "vf"));
+ Compact("f", "g");
+ ASSERT_EQ("va", Get("a"));
+ ASSERT_EQ("vf", Get("f"));
+ ASSERT_EQ("vx", Get("x"));
+}
+
TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions());
@@ -413,6 +548,27 @@ TEST(DBTest, RecoveryWithEmptyLog) {
ASSERT_EQ("v3", Get("foo"));
}
+// Check that writes done during a memtable compaction are recovered
+// if the database is shutdown during the memtable compaction.
+TEST(DBTest, RecoverDuringMemtableCompaction) {
+ Options options;
+ options.env = env_;
+ options.write_buffer_size = 1000000;
+ Reopen(&options);
+
+ // Trigger a long memtable compaction and reopen the database during it
+ ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file
+ ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable
+ ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction
+ ASSERT_OK(Put("bar", "v2")); // Goes to new log file
+
+ Reopen(&options);
+ ASSERT_EQ("v1", Get("foo"));
+ ASSERT_EQ("v2", Get("bar"));
+ ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
+ ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
+}
+
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
@@ -426,11 +582,11 @@ TEST(DBTest, MinorCompactionsHappen) {
const int N = 500;
- int starting_num_tables = NumTableFilesAtLevel(0);
+ int starting_num_tables = TotalTableFiles();
for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
}
- int ending_num_tables = NumTableFilesAtLevel(0);
+ int ending_num_tables = TotalTableFiles();
ASSERT_GT(ending_num_tables, starting_num_tables);
for (int i = 0; i < N; i++) {
@@ -499,6 +655,8 @@ TEST(DBTest, SparseMerge) {
options.compression = kNoCompression;
Reopen(&options);
+ FillLevels("A", "Z");
+
// Suppose there is:
// small amount of data with prefix A
// large amount of data with prefix B
@@ -514,7 +672,8 @@ TEST(DBTest, SparseMerge) {
Put(key, value);
}
Put("C", "vc");
- Compact("", "z");
+ dbfull()->TEST_CompactMemTable();
+ dbfull()->TEST_CompactRange(0, "A", "Z");
// Make sparse update
Put("A", "va2");
@@ -675,6 +834,8 @@ TEST(DBTest, Snapshot) {
TEST(DBTest, HiddenValuesAreRemoved) {
Random rnd(301);
+ FillLevels("a", "z");
+
std::string big = RandomString(&rnd, 50000);
Put("foo", big);
Put("pastfoo", "v");
@@ -702,40 +863,54 @@ TEST(DBTest, HiddenValuesAreRemoved) {
TEST(DBTest, DeletionMarkers1) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
- dbfull()->TEST_CompactRange(0, "", "z");
- dbfull()->TEST_CompactRange(1, "", "z");
- ASSERT_EQ(NumTableFilesAtLevel(2), 1); // foo => v1 is now in level 2 file
+ const int last = config::kNumLevels - 1;
+ ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
+
+ // Place a table at level last-1 to prevent merging with preceding mutation
+ Put("a", "begin");
+ Put("z", "end");
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ(NumTableFilesAtLevel(last), 1);
+ ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
+
Delete("foo");
Put("foo", "v2");
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
- ASSERT_OK(dbfull()->TEST_CompactMemTable());
+ ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
- dbfull()->TEST_CompactRange(0, "", "z");
+ dbfull()->TEST_CompactRange(last-2, "", "z");
// DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
- dbfull()->TEST_CompactRange(1, "", "z");
- // Merging L1 w/ L2, so we are the base level for "foo", so DEL is removed.
- // (as is v1).
+ dbfull()->TEST_CompactRange(last-1, "", "z");
+ // Merging last-1 w/ last, so we are the base level for "foo", so
+ // DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
}
TEST(DBTest, DeletionMarkers2) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
- dbfull()->TEST_CompactRange(0, "", "z");
- dbfull()->TEST_CompactRange(1, "", "z");
- ASSERT_EQ(NumTableFilesAtLevel(2), 1); // foo => v1 is now in level 2 file
+ const int last = config::kNumLevels - 1;
+ ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
+
+ // Place a table at level last-1 to prevent merging with preceding mutation
+ Put("a", "begin");
+ Put("z", "end");
+ dbfull()->TEST_CompactMemTable();
+ ASSERT_EQ(NumTableFilesAtLevel(last), 1);
+ ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
+
Delete("foo");
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
- ASSERT_OK(dbfull()->TEST_CompactMemTable());
+ ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
- dbfull()->TEST_CompactRange(0, "", "z");
- // DEL kept: L2 file overlaps
+ dbfull()->TEST_CompactRange(last-2, "", "z");
+ // DEL kept: "last" file overlaps
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
- dbfull()->TEST_CompactRange(1, "", "z");
- // Merging L1 w/ L2, so we are the base level for "foo", so DEL is removed.
- // (as is v1).
+ dbfull()->TEST_CompactRange(last-1, "", "z");
+ // Merging last-1 w/ last, so we are the base level for "foo", so
+ // DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
}
diff --git a/db/dbformat.cc b/db/dbformat.cc
index c12c138..af2e077 100644
--- a/db/dbformat.cc
+++ b/db/dbformat.cc
@@ -84,4 +84,23 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
}
}
+LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
+ size_t usize = user_key.size();
+ size_t needed = usize + 13; // A conservative estimate
+ char* dst;
+ if (needed <= sizeof(space_)) {
+ dst = space_;
+ } else {
+ dst = new char[needed];
+ }
+ start_ = dst;
+ dst = EncodeVarint32(dst, usize + 8);
+ kstart_ = dst;
+ memcpy(dst, user_key.data(), usize);
+ dst += usize;
+ EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
+ dst += 8;
+ end_ = dst;
+}
+
}
diff --git a/db/dbformat.h b/db/dbformat.h
index 89c4afb..97491bc 100644
--- a/db/dbformat.h
+++ b/db/dbformat.h
@@ -160,6 +160,46 @@ inline bool ParseInternalKey(const Slice& internal_key,
return (c <= static_cast<unsigned char>(kTypeValue));
}
+// A helper class useful for DBImpl::Get()
+class LookupKey {
+ public:
+ // Initialize *this for looking up user_key at a snapshot with
+ // the specified sequence number.
+ LookupKey(const Slice& user_key, SequenceNumber sequence);
+
+ ~LookupKey();
+
+ // Return a key suitable for lookup in a MemTable.
+ Slice memtable_key() const { return Slice(start_, end_ - start_); }
+
+ // Return an internal key (suitable for passing to an internal iterator)
+ Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
+
+ // Return the user key
+ Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
+
+ private:
+ // We construct a char array of the form:
+ // klength varint32 <-- start_
+ // userkey char[klength] <-- kstart_
+ // tag uint64
+ // <-- end_
+ // The array is a suitable MemTable key.
+ // The suffix starting with "userkey" can be used as an InternalKey.
+ const char* start_;
+ const char* kstart_;
+ const char* end_;
+ char space_[200]; // Avoid allocation for short keys
+
+ // No copying allowed
+ LookupKey(const LookupKey&);
+ void operator=(const LookupKey&);
+};
+
+inline LookupKey::~LookupKey() {
+ if (start_ != space_) delete[] start_;
+}
+
}
#endif // STORAGE_LEVELDB_DB_FORMAT_H_
diff --git a/db/memtable.cc b/db/memtable.cc
index 687900a..4555abb 100644
--- a/db/memtable.cc
+++ b/db/memtable.cc
@@ -105,4 +105,41 @@ void MemTable::Add(SequenceNumber s, ValueType type,
table_.Insert(buf);
}
+bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
+ Slice memkey = key.memtable_key();
+ Table::Iterator iter(&table_);
+ iter.Seek(memkey.data());
+ if (iter.Valid()) {
+ // entry format is:
+ // klength varint32
+ // userkey char[klength]
+ // tag uint64
+ // vlength varint32
+ // value char[vlength]
+ // Check that it belongs to same user key. We do not check the
+ // sequence number since the Seek() call above should have skipped
+ // all entries with overly large sequence numbers.
+ const char* entry = iter.key();
+ uint32_t key_length;
+ const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
+ if (comparator_.comparator.user_comparator()->Compare(
+ Slice(key_ptr, key_length - 8),
+ key.user_key()) == 0) {
+ // Correct user key
+ const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
+ switch (static_cast<ValueType>(tag & 0xff)) {
+ case kTypeValue: {
+ Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
+ value->assign(v.data(), v.size());
+ return true;
+ }
+ case kTypeDeletion:
+ *s = Status::NotFound(Slice());
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
}
diff --git a/db/memtable.h b/db/memtable.h
index 2e9bd61..1898b5e 100644
--- a/db/memtable.h
+++ b/db/memtable.h
@@ -57,6 +57,12 @@ class MemTable {
const Slice& key,
const Slice& value);
+ // If memtable contains a value for key, store it in *value and return true.
+ // If memtable contains a deletion for key, store a NotFound() error
+ // in *status and return true.
+ // Else, return false.
+ bool Get(const LookupKey& key, std::string* value, Status* s);
+
private:
~MemTable(); // Private since only Unref() should be used to delete it
diff --git a/db/repair.cc b/db/repair.cc
index 4b57169..ae1b136 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -212,14 +212,12 @@ class Repairer {
}
delete lfile;
- // We ignore any version edits generated by the conversion to a Table
+ // Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits.
- VersionEdit skipped;
FileMetaData meta;
meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator();
- status = BuildTable(dbname_, env_, options_, table_cache_, iter,
- &meta, &skipped);
+ status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
delete iter;
mem->Unref();
mem = NULL;
diff --git a/db/version_edit.h b/db/version_edit.h
index ab874da..a069893 100644
--- a/db/version_edit.h
+++ b/db/version_edit.h
@@ -16,12 +16,13 @@ class VersionSet;
struct FileMetaData {
int refs;
+ int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
- FileMetaData() : refs(0), file_size(0) { }
+ FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
};
class VersionEdit {
diff --git a/db/version_set.cc b/db/version_set.cc
index f64ac8d..54342e4 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -75,6 +75,37 @@ Version::~Version() {
}
}
+int FindFile(const InternalKeyComparator& icmp,
+ const std::vector<FileMetaData*>& files,
+ const Slice& key) {
+ uint32_t left = 0;
+ uint32_t right = files.size();
+ while (left < right) {
+ uint32_t mid = (left + right) / 2;
+ const FileMetaData* f = files[mid];
+ if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
+ // Key at "mid.largest" is < "target". Therefore all
+ // files at or before "mid" are uninteresting.
+ left = mid + 1;
+ } else {
+ // Key at "mid.largest" is >= "target". Therefore all files
+ // after "mid" are uninteresting.
+ right = mid;
+ }
+ }
+ return right;
+}
+
+bool SomeFileOverlapsRange(
+ const InternalKeyComparator& icmp,
+ const std::vector<FileMetaData*>& files,
+ const InternalKey& smallest,
+ const InternalKey& largest) {
+ const int index = FindFile(icmp, files, smallest.Encode());
+ return ((index < files.size()) &&
+ icmp.Compare(largest, files[index]->smallest) >= 0);
+}
+
// An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
@@ -92,22 +123,7 @@ class Version::LevelFileNumIterator : public Iterator {
return index_ < flist_->size();
}
virtual void Seek(const Slice& target) {
- uint32_t left = 0;
- uint32_t right = flist_->size() - 1;
- while (left < right) {
- uint32_t mid = (left + right) / 2;
- int cmp = icmp_.Compare((*flist_)[mid]->largest.Encode(), target);
- if (cmp < 0) {
- // Key at "mid.largest" is < than "target". Therefore all
- // files at or before "mid" are uninteresting.
- left = mid + 1;
- } else {
- // Key at "mid.largest" is >= "target". Therefore all files
- // after "mid" are uninteresting.
- right = mid;
- }
- }
- index_ = left;
+ index_ = FindFile(icmp_, *flist_, target);
}
virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() {
@@ -185,6 +201,144 @@ void Version::AddIterators(const ReadOptions& options,
}
}
+// If "*iter" points at a value or deletion for user_key, store
+// either the value, or a NotFound error and return true.
+// Else return false.
+static bool GetValue(Iterator* iter, const Slice& user_key,
+ std::string* value,
+ Status* s) {
+ if (!iter->Valid()) {
+ return false;
+ }
+ ParsedInternalKey parsed_key;
+ if (!ParseInternalKey(iter->key(), &parsed_key)) {
+ *s = Status::Corruption("corrupted key for ", user_key);
+ return true;
+ }
+ if (parsed_key.user_key != user_key) {
+ return false;
+ }
+ switch (parsed_key.type) {
+ case kTypeDeletion:
+ *s = Status::NotFound(Slice()); // Use an empty error message for speed
+ break;
+ case kTypeValue: {
+ Slice v = iter->value();
+ value->assign(v.data(), v.size());
+ break;
+ }
+ }
+ return true;
+}
+
+static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
+ return a->number > b->number;
+}
+
+Status Version::Get(const ReadOptions& options,
+ const LookupKey& k,
+ std::string* value,
+ GetStats* stats) {
+ Slice ikey = k.internal_key();
+ Slice user_key = k.user_key();
+ const Comparator* ucmp = vset_->icmp_.user_comparator();
+ Status s;
+
+ stats->seek_file = NULL;
+ stats->seek_file_level = -1;
+ FileMetaData* last_file_read = NULL;
+
+ // We can search level-by-level since entries never hop across
+ // levels. Therefore we are guaranteed that if we find data
+ // in an smaller level, later levels are irrelevant.
+ std::vector<FileMetaData*> tmp;
+ FileMetaData* tmp2;
+ for (int level = 0; level < config::kNumLevels; level++) {
+ size_t num_files = files_[level].size();
+ if (num_files == 0) continue;
+
+ // Get the list of files to search in this level
+ FileMetaData* const* files = &files_[level][0];
+ if (level == 0) {
+ // Level-0 files may overlap each other. Find all files that
+ // overlap user_key and process them in order from newest to oldest.
+ tmp.reserve(num_files);
+ for (int i = 0; i < num_files; i++) {
+ FileMetaData* f = files[i];
+ if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
+ ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
+ tmp.push_back(f);
+ }
+ }
+ if (tmp.empty()) continue;
+
+ std::sort(tmp.begin(), tmp.end(), NewestFirst);
+ files = &tmp[0];
+ num_files = tmp.size();
+ } else {
+ // Binary search to find earliest index whose largest key >= ikey.
+ uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
+ if (index >= num_files) {
+ files = NULL;
+ num_files = 0;
+ } else {
+ tmp2 = files[index];
+ if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
+ // All of "tmp2" is past any data for user_key
+ files = NULL;
+ num_files = 0;
+ } else {
+ files = &tmp2;
+ num_files = 1;
+ }
+ }
+ }
+
+ for (int i = 0; i < num_files; ++i) {
+ if (last_file_read != NULL && stats->seek_file == NULL) {
+ // We have had more than one seek for this read. Charge the 1st file.
+ stats->seek_file = last_file_read;
+ stats->seek_file_level = (i == 0 ? level - 1 : level);
+ }
+
+ FileMetaData* f = files[i];
+ last_file_read = f;
+
+ Iterator* iter = vset_->table_cache_->NewIterator(
+ options,
+ f->number,
+ f->file_size);
+ iter->Seek(ikey);
+ const bool done = GetValue(iter, user_key, value, &s);
+ if (!iter->status().ok()) {
+ s = iter->status();
+ delete iter;
+ return s;
+ } else {
+ delete iter;
+ if (done) {
+ return s;
+ }
+ }
+ }
+ }
+
+ return Status::NotFound(Slice()); // Use an empty error message for speed
+}
+
+bool Version::UpdateStats(const GetStats& stats) {
+ FileMetaData* f = stats.seek_file;
+ if (f != NULL) {
+ f->allowed_seeks--;
+ if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
+ file_to_compact_ = f;
+ file_to_compact_level_ = stats.seek_file_level;
+ return true;
+ }
+ }
+ return false;
+}
+
void Version::Ref() {
++refs_;
}
@@ -198,13 +352,22 @@ void Version::Unref() {
}
}
+bool Version::OverlapInLevel(int level,
+ const InternalKey& smallest,
+ const InternalKey& largest) {
+ return SomeFileOverlapsRange(vset_->icmp_, files_[level], smallest, largest);
+}
+
std::string Version::DebugString() const {
std::string r;
for (int level = 0; level < config::kNumLevels; level++) {
- // E.g., level 1: 17:123['a' .. 'd'] 20:43['e' .. 'g']
- r.append("level ");
+ // E.g.,
+ // --- level 1 ---
+ // 17:123['a' .. 'd']
+ // 20:43['e' .. 'g']
+ r.append("--- level ");
AppendNumberTo(&r, level);
- r.push_back(':');
+ r.append(" ---\n");
const std::vector<FileMetaData*>& files = files_[level];
for (size_t i = 0; i < files.size(); i++) {
r.push_back(' ');
@@ -215,9 +378,8 @@ std::string Version::DebugString() const {
AppendEscapedStringTo(&r, files[i]->smallest.Encode());
r.append("' .. '");
AppendEscapedStringTo(&r, files[i]->largest.Encode());
- r.append("']");
+ r.append("']\n");
}
- r.push_back('\n');
}
return r;
}
@@ -305,6 +467,23 @@ class VersionSet::Builder {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
+
+ // We arrange to automatically compact this file after
+ // a certain number of seeks. Let's assume:
+ // (1) One seek costs 10ms
+ // (2) Writing or reading 1MB costs 10ms (100MB/s)
+ // (3) A compaction of 1MB does 25MB of IO:
+ // 1MB read from this level
+ // 10-12MB read from next level (boundaries may be misaligned)
+ // 10-12MB written to next level
+ // This implies that 25 seeks cost the same as the compaction
+ // of 1MB of data. I.e., one seek costs approximately the
+ // same as the compaction of 40KB of data. We are a little
+ // conservative and allow approximately one seek for every 16KB
+ // of data before triggering a compaction.
+ f->allowed_seeks = (f->file_size / 16384);
+ if (f->allowed_seeks < 100) f->allowed_seeks = 100;
+
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
@@ -363,8 +542,14 @@ class VersionSet::Builder {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
+ std::vector<FileMetaData*>* files = &v->files_[level];
+ if (level > 0 && !files->empty()) {
+ // Must not overlap
+ assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
+ f->smallest) < 0);
+ }
f->refs++;
- v->files_[level].push_back(f);
+ files->push_back(f);
}
}
};
@@ -749,7 +934,7 @@ int64_t VersionSet::NumLevelBytes(int level) const {
int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0;
std::vector<FileMetaData*> overlaps;
- for (int level = 0; level < config::kNumLevels - 1; level++) {
+ for (int level = 1; level < config::kNumLevels - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps);
@@ -854,31 +1039,43 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
}
Compaction* VersionSet::PickCompaction() {
- if (!NeedsCompaction()) {
+ Compaction* c;
+ int level;
+
+ // We prefer compactions triggered by too much data in a level over
+ // the compactions triggered by seeks.
+ const bool size_compaction = (current_->compaction_score_ >= 1);
+ const bool seek_compaction = (current_->file_to_compact_ != NULL);
+ if (size_compaction) {
+ level = current_->compaction_level_;
+ assert(level >= 0);
+ assert(level+1 < config::kNumLevels);
+ c = new Compaction(level);
+
+ // Pick the first file that comes after compact_pointer_[level]
+ for (size_t i = 0; i < current_->files_[level].size(); i++) {
+ FileMetaData* f = current_->files_[level][i];
+ if (compact_pointer_[level].empty() ||
+ icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
+ c->inputs_[0].push_back(f);
+ break;
+ }
+ }
+ if (c->inputs_[0].empty()) {
+ // Wrap-around to the beginning of the key space
+ c->inputs_[0].push_back(current_->files_[level][0]);
+ }
+ } else if (seek_compaction) {
+ level = current_->file_to_compact_level_;
+ c = new Compaction(level);
+ c->inputs_[0].push_back(current_->file_to_compact_);
+ } else {
return NULL;
}
- const int level = current_->compaction_level_;
- assert(level >= 0);
- assert(level+1 < config::kNumLevels);
- Compaction* c = new Compaction(level);
c->input_version_ = current_;
c->input_version_->Ref();
- // Pick the first file that comes after compact_pointer_[level]
- for (size_t i = 0; i < current_->files_[level].size(); i++) {
- FileMetaData* f = current_->files_[level][i];
- if (compact_pointer_[level].empty() ||
- icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
- c->inputs_[0].push_back(f);
- break;
- }
- }
- if (c->inputs_[0].empty()) {
- // Wrap-around to the beginning of the key space
- c->inputs_[0].push_back(current_->files_[level][0]);
- }
-
// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
diff --git a/db/version_set.h b/db/version_set.h
index 2bac5e2..f00c35a 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -35,6 +35,21 @@ class Version;
class VersionSet;
class WritableFile;
+// Return the smallest index i such that files[i]->largest >= key.
+// Return files.size() if there is no such file.
+// REQUIRES: "files" contains a sorted list of non-overlapping files.
+extern int FindFile(const InternalKeyComparator& icmp,
+ const std::vector<FileMetaData*>& files,
+ const Slice& key);
+
+// Returns true iff some file in "files" overlaps some part of
+// [smallest,largest].
+extern bool SomeFileOverlapsRange(
+ const InternalKeyComparator& icmp,
+ const std::vector<FileMetaData*>& files,
+ const InternalKey& smallest,
+ const InternalKey& largest);
+
class Version {
public:
// Append to *iters a sequence of iterators that will
@@ -42,11 +57,34 @@ class Version {
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
+ // Lookup the value for key. If found, store it in *val and
+ // return OK. Else return a non-OK status. Fills *stats.
+ // REQUIRES: lock is not held
+ struct GetStats {
+ FileMetaData* seek_file;
+ int seek_file_level;
+ };
+ Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
+ GetStats* stats);
+
+ // Adds "stats" into the current state. Returns true if a new
+ // compaction may need to be triggered, false otherwise.
+ // REQUIRES: lock is held
+ bool UpdateStats(const GetStats& stats);
+
// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
void Unref();
+ // Returns true iff some file in the specified level overlaps
+ // some part of [smallest,largest].
+ bool OverlapInLevel(int level,
+ const InternalKey& smallest,
+ const InternalKey& largest);
+
+ int NumFiles(int level) const { return files_[level].size(); }
+
// Return a human readable string that describes this version's contents.
std::string DebugString() const;
@@ -65,6 +103,10 @@ class Version {
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
+ // Next file to compact based on seek stats.
+ FileMetaData* file_to_compact_;
+ int file_to_compact_level_;
+
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
@@ -73,6 +115,8 @@ class Version {
explicit Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0),
+ file_to_compact_(NULL),
+ file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {
}
@@ -158,7 +202,10 @@ class VersionSet {
Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction.
- bool NeedsCompaction() const { return current_->compaction_score_ >= 1; }
+ bool NeedsCompaction() const {
+ Version* v = current_;
+ return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
+ }
// Add all files listed in any live version to *live.
// May also mutate some internal state.
diff --git a/port/port_posix.h b/port/port_posix.h
index c158db1..d0b0615 100644
--- a/port/port_posix.h
+++ b/port/port_posix.h
@@ -9,6 +9,9 @@
#include <endian.h>
#include <pthread.h>
+#ifdef SNAPPY
+#include <snappy.h>
+#endif
#include <stdint.h>
#include <string>
#include <cstdatomic>
@@ -72,15 +75,30 @@ class AtomicPointer {
}
};
-// TODO(gabor): Implement actual compress
inline bool Snappy_Compress(const char* input, size_t input_length,
std::string* output) {
+#ifdef SNAPPY
+ output->resize(snappy::MaxCompressedLength(input_length));
+ size_t outlen;
+ snappy::RawCompress(input, input_length, &(*output)[0], &outlen);
+ output->resize(outlen);
+ return true;
+#endif
+
return false;
}
-// TODO(gabor): Implement actual uncompress
inline bool Snappy_Uncompress(const char* input_data, size_t input_length,
std::string* output) {
+#ifdef SNAPPY
+ size_t ulength;
+ if (!snappy::GetUncompressedLength(input_data, ulength, &ulength)) {
+ return false;
+ }
+ output->resize(ulength);
+ return snappy::RawUncompress(input_data, input_length, &(*output)[0]);
+#endif
+
return false;
}
diff --git a/table/table_test.cc b/table/table_test.cc
index cf2bae0..10d08fc 100644
--- a/table/table_test.cc
+++ b/table/table_test.cc
@@ -727,11 +727,15 @@ TEST(Harness, RandomizedLongDB) {
Test(&rnd);
// We must have created enough data to force merging
- std::string l0_files, l1_files;
- ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files));
- ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files));
- ASSERT_GT(atoi(l0_files.c_str()) + atoi(l1_files.c_str()), 0);
-
+ int files = 0;
+ for (int level = 0; level < config::kNumLevels; level++) {
+ std::string value;
+ char name[100];
+ snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
+ ASSERT_TRUE(db()->GetProperty(name, &value));
+ files += atoi(value.c_str());
+ }
+ ASSERT_GT(files, 0);
}
class MemTableTest { };