diff options
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r-- | db/db_impl.cc | 217 |
1 files changed, 30 insertions, 187 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc index d012236..3b9e04e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -81,8 +81,8 @@ class NullWritableFile : public WritableFile { // Fix user-supplied options to be reasonable template <class T,class V> static void ClipToRange(T* ptr, V minvalue, V maxvalue) { - if (*ptr > maxvalue) *ptr = maxvalue; - if (*ptr < minvalue) *ptr = minvalue; + if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; + if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; } Options SanitizeOptions(const std::string& dbname, const InternalKeyComparator* icmp, @@ -91,7 +91,6 @@ Options SanitizeOptions(const std::string& dbname, result.comparator = icmp; ClipToRange(&result.max_open_files, 20, 50000); ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); - ClipToRange(&result.large_value_threshold, 16<<10, 1<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); if (result.info_log == NULL) { // Open a log file in the same directory as the db @@ -213,15 +212,12 @@ void DBImpl::DeleteObsoleteFiles() { std::set<uint64_t> live = pending_outputs_; versions_->AddLiveFiles(&live); - versions_->CleanupLargeValueRefs(live); - std::vector<std::string> filenames; env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose uint64_t number; - LargeValueRef large_ref; FileType type; - for (int i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &large_ref, &type)) { + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { bool keep = true; switch (type) { case kLogFile: @@ -241,9 +237,6 @@ void DBImpl::DeleteObsoleteFiles() { // be recorded in pending_outputs_, which is inserted into "live" keep = (live.find(number) != live.end()); break; - case kLargeValueFile: - keep = versions_->LargeValueIsLive(large_ref); - break; case kCurrentFile: case kDBLockFile: case kInfoLogFile: @@ -599,7 +592,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { assert(compact->outfile == NULL); } delete compact->outfile; - for (int i = 0; i < compact->outputs.size(); i++) { + for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; pending_outputs_.erase(out.number); } @@ -695,7 +688,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { // Add compaction outputs compact->compaction->AddInputDeletions(compact->compaction->edit()); const int level = compact->compaction->level(); - for (int i = 0; i < compact->outputs.size(); i++) { + for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( level + 1, @@ -710,7 +703,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { DeleteObsoleteFiles(); } else { // Discard any files we may have created during this failed compaction - for (int i = 0; i < compact->outputs.size(); i++) { + for (size_t i = 0; i < compact->outputs.size(); i++) { env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number)); } } @@ -811,7 +804,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " "%d smallest_snapshot: %d", ikey.user_key.ToString().c_str(), - (int)ikey.sequence, ikey.type, kTypeLargeValueRef, drop, + (int)ikey.sequence, ikey.type, kTypeValue, drop, compact->compaction->IsBaseLevelForKey(ikey.user_key), (int)last_sequence_for_key, (int)compact->smallest_snapshot); #endif @@ -828,26 +821,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - - if (ikey.type == kTypeLargeValueRef) { - if (input->value().size() != LargeValueRef::ByteSize()) { - if (options_.paranoid_checks) { - status = Status::Corruption("invalid large value ref"); - break; - } else { - Log(env_, options_.info_log, - "compaction found invalid large value ref"); - } - } else { - compact->compaction->edit()->AddLargeValueRef( - LargeValueRef::FromRef(input->value()), - compact->current_output()->number, - input->key()); - compact->builder->Add(key, input->value()); - } - } else { - compact->builder->Add(key, input->value()); - } + compact->builder->Add(key, input->value()); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -881,7 +855,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { stats.bytes_read += compact->compaction->input(which, i)->file_size; } } - for (int i = 0; i < compact->outputs.size(); i++) { + for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } @@ -985,40 +959,27 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status status; - - WriteBatch* final = NULL; - { - MutexLock l(&mutex_); - status = MakeRoomForWrite(false); // May temporarily release lock and wait - - uint64_t last_sequence = versions_->LastSequence(); - if (status.ok()) { - status = HandleLargeValues(last_sequence + 1, updates, &final); + MutexLock l(&mutex_); + status = MakeRoomForWrite(false); // May temporarily release lock and wait + uint64_t last_sequence = versions_->LastSequence(); + if (status.ok()) { + WriteBatchInternal::SetSequence(updates, last_sequence + 1); + last_sequence += WriteBatchInternal::Count(updates); + versions_->SetLastSequence(last_sequence); + + // Add to log and apply to memtable + status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + if (status.ok() && options.sync) { + status = logfile_->Sync(); } if (status.ok()) { - WriteBatchInternal::SetSequence(final, last_sequence + 1); - last_sequence += WriteBatchInternal::Count(final); - versions_->SetLastSequence(last_sequence); - - // Add to log and apply to memtable - status = log_->AddRecord(WriteBatchInternal::Contents(final)); - if (status.ok() && options.sync) { - status = logfile_->Sync(); - } - if (status.ok()) { - status = WriteBatchInternal::InsertInto(final, mem_); - } - } - - if (options.post_write_snapshot != NULL) { - *options.post_write_snapshot = - status.ok() ? snapshots_.New(last_sequence) : NULL; + status = WriteBatchInternal::InsertInto(updates, mem_); } } - if (final != updates) { - delete final; + if (options.post_write_snapshot != NULL) { + *options.post_write_snapshot = + status.ok() ? snapshots_.New(last_sequence) : NULL; } - return status; } @@ -1070,124 +1031,6 @@ Status DBImpl::MakeRoomForWrite(bool force) { return s; } -bool DBImpl::HasLargeValues(const WriteBatch& batch) const { - if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) { - for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) { - if (it.op() == kTypeValue && - it.value().size() >= options_.large_value_threshold) { - return true; - } - } - } - return false; -} - -// Given "raw_value", determines the appropriate compression format to use -// and stores the data that should be written to the large value file in -// "*file_bytes", and sets "*ref" to the appropriate large value reference. -// May use "*scratch" as backing store for "*file_bytes". -void DBImpl::MaybeCompressLargeValue( - const Slice& raw_value, - Slice* file_bytes, - std::string* scratch, - LargeValueRef* ref) { - switch (options_.compression) { - case kSnappyCompression: { - if (port::Snappy_Compress(raw_value.data(), raw_value.size(), scratch) && - (scratch->size() < (raw_value.size() / 8) * 7)) { - *file_bytes = *scratch; - *ref = LargeValueRef::Make(raw_value, kSnappyCompression); - return; - } - - // Less than 12.5% compression: just leave as uncompressed data - break; - } - case kNoCompression: - // Use default code outside of switch - break; - } - // Store as uncompressed data - *file_bytes = raw_value; - *ref = LargeValueRef::Make(raw_value, kNoCompression); -} - -Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq, - WriteBatch* updates, - WriteBatch** final) { - if (!HasLargeValues(*updates)) { - // Fast path: no large values found - *final = updates; - } else { - // Copy *updates to a new WriteBatch, replacing the references to - *final = new WriteBatch; - SequenceNumber seq = assigned_seq; - for (WriteBatchInternal::Iterator it(*updates); !it.Done(); it.Next()) { - switch (it.op()) { - case kTypeValue: - if (it.value().size() < options_.large_value_threshold) { - (*final)->Put(it.key(), it.value()); - } else { - std::string scratch; - Slice file_bytes; - LargeValueRef large_ref; - MaybeCompressLargeValue( - it.value(), &file_bytes, &scratch, &large_ref); - InternalKey ikey(it.key(), seq, kTypeLargeValueRef); - if (versions_->RegisterLargeValueRef( - large_ref, versions_->LogNumber(), ikey)) { - // TODO(opt): avoid holding the lock here (but be careful about - // another thread doing a Write and switching logs or - // having us get a different "assigned_seq" value). - - uint64_t tmp_number = versions_->NewFileNumber(); - pending_outputs_.insert(tmp_number); - std::string tmp = TempFileName(dbname_, tmp_number); - WritableFile* file; - Status s = env_->NewWritableFile(tmp, &file); - if (!s.ok()) { - return s; // Caller will delete *final - } - - file->Append(file_bytes); - - s = file->Close(); - delete file; - - if (s.ok()) { - const std::string fname = - LargeValueFileName(dbname_, large_ref); - s = env_->RenameFile(tmp, fname); - } else { - Log(env_, options_.info_log, "Write large value: %s", - s.ToString().c_str()); - } - pending_outputs_.erase(tmp_number); - - if (!s.ok()) { - env_->DeleteFile(tmp); // Cleanup; intentionally ignoring error - return s; // Caller will delete *final - } - } - - // Put an indirect reference in the write batch in place - // of large value - WriteBatchInternal::PutLargeValueRef(*final, it.key(), large_ref); - } - break; - case kTypeLargeValueRef: - return Status::Corruption("Corrupted write batch"); - break; - case kTypeDeletion: - (*final)->Delete(it.key()); - break; - } - seq = seq + 1; - } - } - return Status::OK(); -} - bool DBImpl::GetProperty(const Slice& property, std::string* value) { value->clear(); @@ -1205,7 +1048,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { return false; } else { char buf[100]; - snprintf(buf, sizeof(buf), "%d", versions_->NumLevelFiles(level)); + snprintf(buf, sizeof(buf), "%d", + versions_->NumLevelFiles(static_cast<int>(level))); *value = buf; return true; } @@ -1325,10 +1169,9 @@ Status DestroyDB(const std::string& dbname, const Options& options) { Status result = env->LockFile(LockFileName(dbname), &lock); if (result.ok()) { uint64_t number; - LargeValueRef large_ref; FileType type; - for (int i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &large_ref, &type)) { + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { Status del = env->DeleteFile(dbname + "/" + filenames[i]); if (result.ok() && !del.ok()) { result = del; |