summaryrefslogtreecommitdiff
path: root/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r--db/db_impl.cc217
1 files changed, 30 insertions, 187 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index d012236..3b9e04e 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -81,8 +81,8 @@ class NullWritableFile : public WritableFile {
// Fix user-supplied options to be reasonable
template <class T,class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
- if (*ptr > maxvalue) *ptr = maxvalue;
- if (*ptr < minvalue) *ptr = minvalue;
+ if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
+ if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
Options SanitizeOptions(const std::string& dbname,
const InternalKeyComparator* icmp,
@@ -91,7 +91,6 @@ Options SanitizeOptions(const std::string& dbname,
result.comparator = icmp;
ClipToRange(&result.max_open_files, 20, 50000);
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
- ClipToRange(&result.large_value_threshold, 16<<10, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) {
// Open a log file in the same directory as the db
@@ -213,15 +212,12 @@ void DBImpl::DeleteObsoleteFiles() {
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
- versions_->CleanupLargeValueRefs(live);
-
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
- LargeValueRef large_ref;
FileType type;
- for (int i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i], &number, &large_ref, &type)) {
+ for (size_t i = 0; i < filenames.size(); i++) {
+ if (ParseFileName(filenames[i], &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
@@ -241,9 +237,6 @@ void DBImpl::DeleteObsoleteFiles() {
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
- case kLargeValueFile:
- keep = versions_->LargeValueIsLive(large_ref);
- break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
@@ -599,7 +592,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
assert(compact->outfile == NULL);
}
delete compact->outfile;
- for (int i = 0; i < compact->outputs.size(); i++) {
+ for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
}
@@ -695,7 +688,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
- for (int i = 0; i < compact->outputs.size(); i++) {
+ for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(
level + 1,
@@ -710,7 +703,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
DeleteObsoleteFiles();
} else {
// Discard any files we may have created during this failed compaction
- for (int i = 0; i < compact->outputs.size(); i++) {
+ for (size_t i = 0; i < compact->outputs.size(); i++) {
env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number));
}
}
@@ -811,7 +804,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
"%d smallest_snapshot: %d",
ikey.user_key.ToString().c_str(),
- (int)ikey.sequence, ikey.type, kTypeLargeValueRef, drop,
+ (int)ikey.sequence, ikey.type, kTypeValue, drop,
compact->compaction->IsBaseLevelForKey(ikey.user_key),
(int)last_sequence_for_key, (int)compact->smallest_snapshot);
#endif
@@ -828,26 +821,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
-
- if (ikey.type == kTypeLargeValueRef) {
- if (input->value().size() != LargeValueRef::ByteSize()) {
- if (options_.paranoid_checks) {
- status = Status::Corruption("invalid large value ref");
- break;
- } else {
- Log(env_, options_.info_log,
- "compaction found invalid large value ref");
- }
- } else {
- compact->compaction->edit()->AddLargeValueRef(
- LargeValueRef::FromRef(input->value()),
- compact->current_output()->number,
- input->key());
- compact->builder->Add(key, input->value());
- }
- } else {
- compact->builder->Add(key, input->value());
- }
+ compact->builder->Add(key, input->value());
// Close output file if it is big enough
if (compact->builder->FileSize() >=
@@ -881,7 +855,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
- for (int i = 0; i < compact->outputs.size(); i++) {
+ for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
@@ -985,40 +959,27 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
-
- WriteBatch* final = NULL;
- {
- MutexLock l(&mutex_);
- status = MakeRoomForWrite(false); // May temporarily release lock and wait
-
- uint64_t last_sequence = versions_->LastSequence();
- if (status.ok()) {
- status = HandleLargeValues(last_sequence + 1, updates, &final);
+ MutexLock l(&mutex_);
+ status = MakeRoomForWrite(false); // May temporarily release lock and wait
+ uint64_t last_sequence = versions_->LastSequence();
+ if (status.ok()) {
+ WriteBatchInternal::SetSequence(updates, last_sequence + 1);
+ last_sequence += WriteBatchInternal::Count(updates);
+ versions_->SetLastSequence(last_sequence);
+
+ // Add to log and apply to memtable
+ status = log_->AddRecord(WriteBatchInternal::Contents(updates));
+ if (status.ok() && options.sync) {
+ status = logfile_->Sync();
}
if (status.ok()) {
- WriteBatchInternal::SetSequence(final, last_sequence + 1);
- last_sequence += WriteBatchInternal::Count(final);
- versions_->SetLastSequence(last_sequence);
-
- // Add to log and apply to memtable
- status = log_->AddRecord(WriteBatchInternal::Contents(final));
- if (status.ok() && options.sync) {
- status = logfile_->Sync();
- }
- if (status.ok()) {
- status = WriteBatchInternal::InsertInto(final, mem_);
- }
- }
-
- if (options.post_write_snapshot != NULL) {
- *options.post_write_snapshot =
- status.ok() ? snapshots_.New(last_sequence) : NULL;
+ status = WriteBatchInternal::InsertInto(updates, mem_);
}
}
- if (final != updates) {
- delete final;
+ if (options.post_write_snapshot != NULL) {
+ *options.post_write_snapshot =
+ status.ok() ? snapshots_.New(last_sequence) : NULL;
}
-
return status;
}
@@ -1070,124 +1031,6 @@ Status DBImpl::MakeRoomForWrite(bool force) {
return s;
}
-bool DBImpl::HasLargeValues(const WriteBatch& batch) const {
- if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) {
- for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) {
- if (it.op() == kTypeValue &&
- it.value().size() >= options_.large_value_threshold) {
- return true;
- }
- }
- }
- return false;
-}
-
-// Given "raw_value", determines the appropriate compression format to use
-// and stores the data that should be written to the large value file in
-// "*file_bytes", and sets "*ref" to the appropriate large value reference.
-// May use "*scratch" as backing store for "*file_bytes".
-void DBImpl::MaybeCompressLargeValue(
- const Slice& raw_value,
- Slice* file_bytes,
- std::string* scratch,
- LargeValueRef* ref) {
- switch (options_.compression) {
- case kSnappyCompression: {
- if (port::Snappy_Compress(raw_value.data(), raw_value.size(), scratch) &&
- (scratch->size() < (raw_value.size() / 8) * 7)) {
- *file_bytes = *scratch;
- *ref = LargeValueRef::Make(raw_value, kSnappyCompression);
- return;
- }
-
- // Less than 12.5% compression: just leave as uncompressed data
- break;
- }
- case kNoCompression:
- // Use default code outside of switch
- break;
- }
- // Store as uncompressed data
- *file_bytes = raw_value;
- *ref = LargeValueRef::Make(raw_value, kNoCompression);
-}
-
-Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq,
- WriteBatch* updates,
- WriteBatch** final) {
- if (!HasLargeValues(*updates)) {
- // Fast path: no large values found
- *final = updates;
- } else {
- // Copy *updates to a new WriteBatch, replacing the references to
- *final = new WriteBatch;
- SequenceNumber seq = assigned_seq;
- for (WriteBatchInternal::Iterator it(*updates); !it.Done(); it.Next()) {
- switch (it.op()) {
- case kTypeValue:
- if (it.value().size() < options_.large_value_threshold) {
- (*final)->Put(it.key(), it.value());
- } else {
- std::string scratch;
- Slice file_bytes;
- LargeValueRef large_ref;
- MaybeCompressLargeValue(
- it.value(), &file_bytes, &scratch, &large_ref);
- InternalKey ikey(it.key(), seq, kTypeLargeValueRef);
- if (versions_->RegisterLargeValueRef(
- large_ref, versions_->LogNumber(), ikey)) {
- // TODO(opt): avoid holding the lock here (but be careful about
- // another thread doing a Write and switching logs or
- // having us get a different "assigned_seq" value).
-
- uint64_t tmp_number = versions_->NewFileNumber();
- pending_outputs_.insert(tmp_number);
- std::string tmp = TempFileName(dbname_, tmp_number);
- WritableFile* file;
- Status s = env_->NewWritableFile(tmp, &file);
- if (!s.ok()) {
- return s; // Caller will delete *final
- }
-
- file->Append(file_bytes);
-
- s = file->Close();
- delete file;
-
- if (s.ok()) {
- const std::string fname =
- LargeValueFileName(dbname_, large_ref);
- s = env_->RenameFile(tmp, fname);
- } else {
- Log(env_, options_.info_log, "Write large value: %s",
- s.ToString().c_str());
- }
- pending_outputs_.erase(tmp_number);
-
- if (!s.ok()) {
- env_->DeleteFile(tmp); // Cleanup; intentionally ignoring error
- return s; // Caller will delete *final
- }
- }
-
- // Put an indirect reference in the write batch in place
- // of large value
- WriteBatchInternal::PutLargeValueRef(*final, it.key(), large_ref);
- }
- break;
- case kTypeLargeValueRef:
- return Status::Corruption("Corrupted write batch");
- break;
- case kTypeDeletion:
- (*final)->Delete(it.key());
- break;
- }
- seq = seq + 1;
- }
- }
- return Status::OK();
-}
-
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
value->clear();
@@ -1205,7 +1048,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
return false;
} else {
char buf[100];
- snprintf(buf, sizeof(buf), "%d", versions_->NumLevelFiles(level));
+ snprintf(buf, sizeof(buf), "%d",
+ versions_->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
@@ -1325,10 +1169,9 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
Status result = env->LockFile(LockFileName(dbname), &lock);
if (result.ok()) {
uint64_t number;
- LargeValueRef large_ref;
FileType type;
- for (int i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i], &number, &large_ref, &type)) {
+ for (size_t i = 0; i < filenames.size(); i++) {
+ if (ParseFileName(filenames[i], &number, &type)) {
Status del = env->DeleteFile(dbname + "/" + filenames[i]);
if (result.ok() && !del.ok()) {
result = del;