summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgabor@google.com <gabor@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2011-07-21 02:40:18 +0000
committergabor@google.com <gabor@google.com@62dab493-f737-651d-591e-8d6aee1b9529>2011-07-21 02:40:18 +0000
commit60bd8015f21fdb63d5409b1191f8ea9d8f1a1b87 (patch)
treedab21fd0d1309be4e6851f690e1c011e79ddbf6b
parent6872ace90110799f87402cbc594c4cbf1bc474c7 (diff)
downloadleveldb-60bd8015f21fdb63d5409b1191f8ea9d8f1a1b87.tar.gz
Speed up Snappy uncompression, new Logger interface.
- Removed one copy of an uncompressed block contents changing the signature of Snappy_Uncompress() so it uncompresses into a flat array instead of a std::string. Speeds up readrandom ~10%. - Instead of a combination of Env/WritableFile, we now have a Logger interface that can be easily overridden applications that want to supply their own logging. - Separated out the gcc and Sun Studio parts of atomic_pointer.h so we can use 'asm', 'volatile' keywords for Sun Studio. git-svn-id: https://leveldb.googlecode.com/svn/trunk@39 62dab493-f737-651d-591e-8d6aee1b9529
-rw-r--r--db/db_bench.cc7
-rw-r--r--db/db_impl.cc45
-rw-r--r--db/repair.cc20
-rw-r--r--db/version_set.cc4
-rw-r--r--include/leveldb/env.h29
-rw-r--r--include/leveldb/options.h6
-rw-r--r--port/atomic_pointer.h14
-rw-r--r--port/port_android.h8
-rw-r--r--port/port_chromium.cc18
-rw-r--r--port/port_chromium.h4
-rw-r--r--port/port_example.h12
-rw-r--r--port/port_posix.h26
-rw-r--r--table/format.cc20
-rw-r--r--util/env.cc15
-rw-r--r--util/env_chromium.cc74
-rw-r--r--util/env_posix.cc74
-rw-r--r--util/posix_logger.h97
17 files changed, 261 insertions, 212 deletions
diff --git a/db/db_bench.cc b/db/db_bench.cc
index 53b8c53..7b4e41a 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -472,13 +472,14 @@ class Benchmark {
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
- std::string uncompressed;
+ char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
- &uncompressed);
- bytes += uncompressed.size();
+ uncompressed);
+ bytes += input.size();
FinishedSingleOp();
}
+ delete[] uncompressed;
if (!ok) {
message_ = "(snappy failure)";
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 48056da..5a0648e 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -68,16 +68,6 @@ struct DBImpl::CompactionState {
}
};
-namespace {
-class NullWritableFile : public WritableFile {
- public:
- virtual Status Append(const Slice& data) { return Status::OK(); }
- virtual Status Close() { return Status::OK(); }
- virtual Status Flush() { return Status::OK(); }
- virtual Status Sync() { return Status::OK(); }
-};
-}
-
// Fix user-supplied options to be reasonable
template <class T,class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
@@ -96,11 +86,10 @@ Options SanitizeOptions(const std::string& dbname,
// Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist
src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
- Status s = src.env->NewWritableFile(InfoLogFileName(dbname),
- &result.info_log);
+ Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
if (!s.ok()) {
// No place suitable for logging
- result.info_log = new NullWritableFile;
+ result.info_log = NULL;
}
}
if (result.block_cache == NULL) {
@@ -201,7 +190,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) {
// No change needed
} else {
- Log(env_, options_.info_log, "Ignoring error %s", s->ToString().c_str());
+ Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
*s = Status::OK();
}
}
@@ -247,7 +236,7 @@ void DBImpl::DeleteObsoleteFiles() {
if (type == kTableFile) {
table_cache_->Evict(number);
}
- Log(env_, options_.info_log, "Delete type=%d #%lld\n",
+ Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
@@ -336,11 +325,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
- WritableFile* info_log;
+ Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
- Log(env, info_log, "%s%s: dropping %d bytes; %s",
+ Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
@@ -370,7 +359,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// large sequence numbers).
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
- Log(env_, options_.info_log, "Recovering log #%llu",
+ Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
// Read all the records and add to a memtable
@@ -434,7 +423,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
- Log(env_, options_.info_log, "Level-0 table #%llu: started",
+ Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number);
Status s;
@@ -444,7 +433,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
mutex_.Lock();
}
- Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s",
+ Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());
@@ -613,7 +602,7 @@ void DBImpl::BackgroundCompaction() {
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit());
VersionSet::LevelSummaryStorage tmp;
- Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
+ Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
@@ -631,7 +620,7 @@ void DBImpl::BackgroundCompaction() {
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
- Log(env_, options_.info_log,
+ Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
if (options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
@@ -727,7 +716,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
s = iter->status();
delete iter;
if (s.ok()) {
- Log(env_, options_.info_log,
+ Log(options_.info_log,
"Generated table #%llu: %lld keys, %lld bytes",
(unsigned long long) output_number,
(unsigned long long) current_entries,
@@ -740,7 +729,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
- Log(env_, options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
+ Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
@@ -776,7 +765,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
- Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files",
+ Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
@@ -859,7 +848,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = ikey.sequence;
}
#if 0
- Log(env_, options_.info_log,
+ Log(options_.info_log,
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
"%d smallest_snapshot: %d",
ikey.user_key.ToString().c_str(),
@@ -925,7 +914,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
status = InstallCompactionResults(compact);
}
VersionSet::LevelSummaryStorage tmp;
- Log(env_, options_.info_log,
+ Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
@@ -1112,7 +1101,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
- Log(env_, options_.info_log, "waiting...\n");
+ Log(options_.info_log, "waiting...\n");
bg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
diff --git a/db/repair.cc b/db/repair.cc
index 2e3f506..5bcdb56 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -78,7 +78,7 @@ class Repairer {
for (size_t i = 0; i < tables_.size(); i++) {
bytes += tables_[i].meta.file_size;
}
- Log(env_, options_.info_log,
+ Log(options_.info_log,
"**** Repaired leveldb %s; "
"recovered %d files; %llu bytes. "
"Some data may have been lost. "
@@ -149,7 +149,7 @@ class Repairer {
std::string logname = LogFileName(dbname_, logs_[i]);
Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) {
- Log(env_, options_.info_log, "Log #%llu: ignoring conversion error: %s",
+ Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
(unsigned long long) logs_[i],
status.ToString().c_str());
}
@@ -160,11 +160,11 @@ class Repairer {
Status ConvertLogToTable(uint64_t log) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
- WritableFile* info_log;
+ Logger* info_log;
uint64_t lognum;
virtual void Corruption(size_t bytes, const Status& s) {
// We print error messages for corruption, but continue repairing.
- Log(env, info_log, "Log #%llu: dropping %d bytes; %s",
+ Log(info_log, "Log #%llu: dropping %d bytes; %s",
(unsigned long long) lognum,
static_cast<int>(bytes),
s.ToString().c_str());
@@ -209,7 +209,7 @@ class Repairer {
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
- Log(env_, options_.info_log, "Log #%llu: ignoring %s",
+ Log(options_.info_log, "Log #%llu: ignoring %s",
(unsigned long long) log,
status.ToString().c_str());
status = Status::OK(); // Keep going with rest of file
@@ -231,7 +231,7 @@ class Repairer {
table_numbers_.push_back(meta.number);
}
}
- Log(env_, options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
+ Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
(unsigned long long) log,
counter,
(unsigned long long) meta.number,
@@ -247,7 +247,7 @@ class Repairer {
Status status = ScanTable(&t);
if (!status.ok()) {
std::string fname = TableFileName(dbname_, table_numbers_[i]);
- Log(env_, options_.info_log, "Table #%llu: ignoring %s",
+ Log(options_.info_log, "Table #%llu: ignoring %s",
(unsigned long long) table_numbers_[i],
status.ToString().c_str());
ArchiveFile(fname);
@@ -270,7 +270,7 @@ class Repairer {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
- Log(env_, options_.info_log, "Table #%llu: unparsable key %s",
+ Log(options_.info_log, "Table #%llu: unparsable key %s",
(unsigned long long) t->meta.number,
EscapeString(key).c_str());
continue;
@@ -291,7 +291,7 @@ class Repairer {
}
delete iter;
}
- Log(env_, options_.info_log, "Table #%llu: %d entries %s",
+ Log(options_.info_log, "Table #%llu: %d entries %s",
(unsigned long long) t->meta.number,
counter,
status.ToString().c_str());
@@ -373,7 +373,7 @@ class Repairer {
new_file.append("/");
new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
Status s = env_->RenameFile(fname, new_file);
- Log(env_, options_.info_log, "Archiving %s: %s\n",
+ Log(options_.info_log, "Archiving %s: %s\n",
fname.c_str(), s.ToString().c_str());
}
};
diff --git a/db/version_set.cc b/db/version_set.cc
index 62bd6dd..5040b72 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -1124,7 +1124,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
std::vector<FileMetaData*> expanded1;
GetOverlappingInputs(level+1, new_start, new_limit, &expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
- Log(env_, options_->info_log,
+ Log(options_->info_log,
"Expanding@%d %d+%d to %d+%d\n",
level,
int(c->inputs_[0].size()),
@@ -1147,7 +1147,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
}
if (false) {
- Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'",
+ Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
EscapeString(smallest.Encode()).c_str(),
EscapeString(largest.Encode()).c_str());
diff --git a/include/leveldb/env.h b/include/leveldb/env.h
index 39f6a1a..bf51008 100644
--- a/include/leveldb/env.h
+++ b/include/leveldb/env.h
@@ -22,6 +22,7 @@
namespace leveldb {
class FileLock;
+class Logger;
class RandomAccessFile;
class SequentialFile;
class Slice;
@@ -134,8 +135,8 @@ class Env {
// same directory.
virtual Status GetTestDirectory(std::string* path) = 0;
- // Write an entry to the log file with the specified format.
- virtual void Logv(WritableFile* log, const char* format, va_list ap) = 0;
+ // Create and return a log file for storing informational messages.
+ virtual Status NewLogger(const std::string& fname, Logger** result) = 0;
// Returns the number of micro-seconds since some fixed point in time. Only
// useful for computing deltas of time.
@@ -210,6 +211,22 @@ class WritableFile {
void operator=(const WritableFile&);
};
+// An interface for writing log messages.
+class Logger {
+ public:
+ Logger() { }
+ virtual ~Logger();
+
+ // Write an entry to the log file with the specified format.
+ virtual void Logv(const char* format, va_list ap) = 0;
+
+ private:
+ // No copying allowed
+ Logger(const Logger&);
+ void operator=(const Logger&);
+};
+
+
// Identifies a locked file.
class FileLock {
public:
@@ -222,9 +239,9 @@ class FileLock {
};
// Log the specified data to *info_log if info_log is non-NULL.
-extern void Log(Env* env, WritableFile* info_log, const char* format, ...)
+extern void Log(Logger* info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
- __attribute__((__format__ (__printf__, 3, 4)))
+ __attribute__((__format__ (__printf__, 2, 3)))
# endif
;
@@ -284,8 +301,8 @@ class EnvWrapper : public Env {
virtual Status GetTestDirectory(std::string* path) {
return target_->GetTestDirectory(path);
}
- virtual void Logv(WritableFile* log, const char* format, va_list ap) {
- return target_->Logv(log, format, ap);
+ virtual Status NewLogger(const std::string& fname, Logger** result) {
+ return target_->NewLogger(fname, result);
}
uint64_t NowMicros() {
return target_->NowMicros();
diff --git a/include/leveldb/options.h b/include/leveldb/options.h
index 0d4f6cd..381f228 100644
--- a/include/leveldb/options.h
+++ b/include/leveldb/options.h
@@ -12,8 +12,8 @@ namespace leveldb {
class Cache;
class Comparator;
class Env;
+class Logger;
class Snapshot;
-class WritableFile;
// DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before
@@ -61,10 +61,10 @@ struct Options {
Env* env;
// Any internal progress/error information generated by the db will
- // be to written to info_log if it is non-NULL, or to a file stored
+ // be written to info_log if it is non-NULL, or to a file stored
// in the same directory as the DB contents if info_log is NULL.
// Default: NULL
- WritableFile* info_log;
+ Logger* info_log;
// -------------------
// Parameters that affect performance
diff --git a/port/atomic_pointer.h b/port/atomic_pointer.h
index c618778..c20b1bd 100644
--- a/port/atomic_pointer.h
+++ b/port/atomic_pointer.h
@@ -48,9 +48,8 @@ namespace port {
// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
#define LEVELDB_HAVE_MEMORY_BARRIER
-// Gcc and Sun Studio on x86
-#elif defined(ARCH_CPU_X86_FAMILY) && \
- (defined(__GNUC__) || defined(__SUNPRO_CC))
+// Gcc on x86
+#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
@@ -58,6 +57,15 @@ inline void MemoryBarrier() {
}
#define LEVELDB_HAVE_MEMORY_BARRIER
+// Sun Studio
+#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
+inline void MemoryBarrier() {
+ // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
+ // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
+ asm volatile("" : : : "memory");
+}
+#define LEVELDB_HAVE_MEMORY_BARRIER
+
// Mac OS
#elif defined(OS_MACOSX)
inline void MemoryBarrier() {
diff --git a/port/port_android.h b/port/port_android.h
index 13df9c9..d68b6c0 100644
--- a/port/port_android.h
+++ b/port/port_android.h
@@ -126,10 +126,16 @@ inline bool Snappy_Compress(
}
// TODO(gabor): Implement uncompress
+inline bool Snappy_GetUncompressedLength(const char* input, size_t length,
+ size_t* result) {
+ return false;
+}
+
+// TODO(gabor): Implement uncompress
inline bool Snappy_Uncompress(
const char* input_data,
size_t input_length,
- std::string* output) {
+ char* output) {
return false;
}
diff --git a/port/port_chromium.cc b/port/port_chromium.cc
index 2ab49b9..7f6de92 100644
--- a/port/port_chromium.cc
+++ b/port/port_chromium.cc
@@ -62,15 +62,19 @@ bool Snappy_Compress(const char* input, size_t input_length,
#endif
}
+bool Snappy_GetUncompressedLength(const char* input, size_t length,
+ size_t* result) {
+#if defined(USE_SNAPPY)
+ return snappy::GetUncompressedLength(input_data, input_length, result);
+#else
+ return false;
+#endif
+}
+
bool Snappy_Uncompress(const char* input_data, size_t input_length,
- std::string* output) {
+ char* output) {
#if defined(USE_SNAPPY)
- size_t ulength;
- if (!snappy::GetUncompressedLength(input_data, input_length, &ulength)) {
- return false;
- }
- output->resize(ulength);
- return snappy::RawUncompress(input_data, input_length, &(*output)[0]);
+ return snappy::RawUncompress(input_data, input_length, output);
#else
return false;
#endif
diff --git a/port/port_chromium.h b/port/port_chromium.h
index 1851e6e..feecd5b 100644
--- a/port/port_chromium.h
+++ b/port/port_chromium.h
@@ -84,8 +84,10 @@ class AtomicPointer {
bool Snappy_Compress(const char* input, size_t input_length,
std::string* output);
+bool Snappy_GetUncompressedLength(const char* input, size_t length,
+ size_t* result);
bool Snappy_Uncompress(const char* input_data, size_t input_length,
- std::string* output);
+ char* output);
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
return false;
diff --git a/port/port_example.h b/port/port_example.h
index 8a624f3..6bd9b49 100644
--- a/port/port_example.h
+++ b/port/port_example.h
@@ -96,11 +96,21 @@ class AtomicPointer {
extern bool Snappy_Compress(const char* input, size_t input_length,
std::string* output);
+// If input[0,input_length-1] looks like a valid snappy compressed
+// buffer, store the size of the uncompressed data in *result and
+// return true. Else return false.
+extern bool Snappy_GetUncompressedLength(const char* input, size_t length,
+ size_t* result);
+
// Attempt to snappy uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid lightweight
// compressed data.
+//
+// REQUIRES: at least the first "n" bytes of output[] must be writable
+// where "n" is the result of a successful call to
+// Snappy_GetUncompressedLength.
extern bool Snappy_Uncompress(const char* input_data, size_t input_length,
- std::string* output);
+ char* output);
// ------------------ Miscellaneous -------------------
diff --git a/port/port_posix.h b/port/port_posix.h
index 2995026..ef01de3 100644
--- a/port/port_posix.h
+++ b/port/port_posix.h
@@ -80,12 +80,12 @@ class CondVar {
Mutex* mu_;
};
-inline bool Snappy_Compress(const char* input, size_t input_length,
+inline bool Snappy_Compress(const char* input, size_t length,
::std::string* output) {
#ifdef SNAPPY
- output->resize(snappy::MaxCompressedLength(input_length));
+ output->resize(snappy::MaxCompressedLength(length));
size_t outlen;
- snappy::RawCompress(input, input_length, &(*output)[0], &outlen);
+ snappy::RawCompress(input, length, &(*output)[0], &outlen);
output->resize(outlen);
return true;
#endif
@@ -93,18 +93,22 @@ inline bool Snappy_Compress(const char* input, size_t input_length,
return false;
}
-inline bool Snappy_Uncompress(const char* input_data, size_t input_length,
- ::std::string* output) {
+inline bool Snappy_GetUncompressedLength(const char* input, size_t length,
+ size_t* result) {
#ifdef SNAPPY
- size_t ulength;
- if (!snappy::GetUncompressedLength(input_data, input_length, &ulength)) {
- return false;
- }
- output->resize(ulength);
- return snappy::RawUncompress(input_data, input_length, &(*output)[0]);
+ return snappy::GetUncompressedLength(input, length, result);
+#else
+ return false;
#endif
+}
+inline bool Snappy_Uncompress(const char* input, size_t length,
+ char* output) {
+#ifdef SNAPPY
+ return snappy::RawUncompress(input, length, output);
+#else
return false;
+#endif
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
diff --git a/table/format.cc b/table/format.cc
index 63971db..ba7838c 100644
--- a/table/format.cc
+++ b/table/format.cc
@@ -107,16 +107,20 @@ Status ReadBlock(RandomAccessFile* file,
// Ok
break;
case kSnappyCompression: {
- std::string decompressed;
- if (!port::Snappy_Uncompress(data, n, &decompressed)) {
+ size_t ulength = 0;
+ if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
- s = Status::Corruption("corrupted compressed block contents");
- return s;
+ return Status::Corruption("corrupted compressed block contents");
}
- delete[] buf; // Done with uncompressed data
- buf = new char[decompressed.size()];
- memcpy(buf, decompressed.data(), decompressed.size());
- n = decompressed.size();
+ char* ubuf = new char[ulength];
+ if (!port::Snappy_Uncompress(data, n, ubuf)) {
+ delete[] buf;
+ delete[] ubuf;
+ return Status::Corruption("corrupted compressed block contents");
+ }
+ delete[] buf;
+ buf = ubuf;
+ n = ulength;
break;
}
default:
diff --git a/util/env.cc b/util/env.cc
index e5297e7..79e493e 100644
--- a/util/env.cc
+++ b/util/env.cc
@@ -18,14 +18,19 @@ RandomAccessFile::~RandomAccessFile() {
WritableFile::~WritableFile() {
}
+Logger::~Logger() {
+}
+
FileLock::~FileLock() {
}
-void Log(Env* env, WritableFile* info_log, const char* format, ...) {
- va_list ap;
- va_start(ap, format);
- env->Logv(info_log, format, ap);
- va_end(ap);
+void Log(Logger* info_log, const char* format, ...) {
+ if (info_log != NULL) {
+ va_list ap;
+ va_start(ap, format);
+ info_log->Logv(format, ap);
+ va_end(ap);
+ }
}
Status WriteStringToFile(Env* env, const Slice& data,
diff --git a/util/env_chromium.cc b/util/env_chromium.cc
index 1af525a..975386b 100644
--- a/util/env_chromium.cc
+++ b/util/env_chromium.cc
@@ -23,6 +23,7 @@
#include "leveldb/slice.h"
#include "port/port.h"
#include "util/logging.h"
+#include "util/posix_logger.h"
#if defined(OS_WIN)
#include <io.h>
@@ -406,9 +407,8 @@ class ChromiumEnv : public Env {
return Status::OK();
}
- virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
- // TODO(jorlow): We may want to just use Chromium's built in logging.
-
+ // TODO(user,user): Use Chromium's built-in logging?
+ static uint64_t gettid() {
uint64_t thread_id = 0;
// Coppied from base/logging.cc.
#if defined(OS_WIN)
@@ -422,65 +422,17 @@ class ChromiumEnv : public Env {
pthread_t tid = pthread_self();
memcpy(&thread_id, &tid, min(sizeof(r), sizeof(tid)));
#endif
+ return thread_id;
+ }
- // We try twice: the first time with a fixed-size stack allocated buffer,
- // and the second time with a much larger dynamically allocated buffer.
- char buffer[500];
- for (int iter = 0; iter < 2; iter++) {
- char* base;
- int bufsize;
- if (iter == 0) {
- bufsize = sizeof(buffer);
- base = buffer;
- } else {
- bufsize = 30000;
- base = new char[bufsize];
- }
- char* p = base;
- char* limit = base + bufsize;
-
- ::base::Time::Exploded t;
- ::base::Time::Now().LocalExplode(&t);
- p += snprintf(p, limit - p,
- "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
- t.year,
- t.month,
- t.day_of_month,
- t.hour,
- t.minute,
- t.second,
- static_cast<int>(t.millisecond) * 1000,
- static_cast<long long unsigned int>(thread_id));
-
- // Print the message
- if (p < limit) {
- va_list backup_ap;
- va_copy(backup_ap, ap);
- p += vsnprintf(p, limit - p, format, backup_ap);
- va_end(backup_ap);
- }
-
- // Truncate to available space if necessary
- if (p >= limit) {
- if (iter == 0) {
- continue; // Try again with larger buffer
- } else {
- p = limit - 1;
- }
- }
-
- // Add newline if necessary
- if (p == base || p[-1] != '\n') {
- *p++ = '\n';
- }
-
- assert(p <= limit);
- info_log->Append(Slice(base, p - base));
- info_log->Flush();
- if (base != buffer) {
- delete[] base;
- }
- break;
+ virtual Status NewLogger(const std::string& fname, Logger** result) {
+ FILE* f = fopen(fname.c_str(), "w");
+ if (f == NULL) {
+ *result = NULL;
+ return Status::IOError(fname, strerror(errno));
+ } else {
+ *result = new PosixLogger(f, &ChromiumEnv::gettid);
+ return Status::OK();
}
}
diff --git a/util/env_posix.cc b/util/env_posix.cc
index 46723e2..5127c89 100644
--- a/util/env_posix.cc
+++ b/util/env_posix.cc
@@ -23,6 +23,7 @@
#include "leveldb/slice.h"
#include "port/port.h"
#include "util/logging.h"
+#include "util/posix_logger.h"
namespace leveldb {
@@ -427,72 +428,21 @@ class PosixEnv : public Env {
return Status::OK();
}
- virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
+ static uint64_t gettid() {
pthread_t tid = pthread_self();
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
+ return thread_id;
+ }
- // We try twice: the first time with a fixed-size stack allocated buffer,
- // and the second time with a much larger dynamically allocated buffer.
- char buffer[500];
- for (int iter = 0; iter < 2; iter++) {
- char* base;
- int bufsize;
- if (iter == 0) {
- bufsize = sizeof(buffer);
- base = buffer;
- } else {
- bufsize = 30000;
- base = new char[bufsize];
- }
- char* p = base;
- char* limit = base + bufsize;
-
- struct timeval now_tv;
- gettimeofday(&now_tv, NULL);
- const time_t seconds = now_tv.tv_sec;
- struct tm t;
- localtime_r(&seconds, &t);
- p += snprintf(p, limit - p,
- "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
- t.tm_year + 1900,
- t.tm_mon + 1,
- t.tm_mday,
- t.tm_hour,
- t.tm_min,
- t.tm_sec,
- static_cast<int>(now_tv.tv_usec),
- static_cast<long long unsigned int>(thread_id));
-
- // Print the message
- if (p < limit) {
- va_list backup_ap;
- va_copy(backup_ap, ap);
- p += vsnprintf(p, limit - p, format, backup_ap);
- va_end(backup_ap);
- }
-
- // Truncate to available space if necessary
- if (p >= limit) {
- if (iter == 0) {
- continue; // Try again with larger buffer
- } else {
- p = limit - 1;
- }
- }
-
- // Add newline if necessary
- if (p == base || p[-1] != '\n') {
- *p++ = '\n';
- }
-
- assert(p <= limit);
- info_log->Append(Slice(base, p - base));
- info_log->Flush();
- if (base != buffer) {
- delete[] base;
- }
- break;
+ virtual Status NewLogger(const std::string& fname, Logger** result) {
+ FILE* f = fopen(fname.c_str(), "w");
+ if (f == NULL) {
+ *result = NULL;
+ return IOError(fname, errno);
+ } else {
+ *result = new PosixLogger(f, &PosixEnv::gettid);
+ return Status::OK();
}
}
diff --git a/util/posix_logger.h b/util/posix_logger.h
new file mode 100644
index 0000000..0dbdeaa
--- /dev/null
+++ b/util/posix_logger.h
@@ -0,0 +1,97 @@
+// Copyright 2011 Google Inc. All Rights Reserved.
+// Author: sanjay@google.com (Sanjay Ghemawat)
+//
+// Logger implementation that can be shared by all environments
+// where enough posix functionality is available.
+
+#ifndef STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_
+#define STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_
+
+#include <algorithm>
+#include <stdio.h>
+#include <sys/time.h>
+#include <time.h>
+#include "leveldb/env.h"
+
+namespace leveldb {
+
+class PosixLogger : public Logger {
+ private:
+ FILE* file_;
+ uint64_t (*gettid_)(); // Return the thread id for the current thread
+ public:
+ PosixLogger(FILE* f, uint64_t (*gettid)()) : file_(f), gettid_(gettid) { }
+ virtual ~PosixLogger() {
+ fclose(file_);
+ }
+ virtual void Logv(const char* format, va_list ap) {
+ const uint64_t thread_id = (*gettid_)();
+
+ // We try twice: the first time with a fixed-size stack allocated buffer,
+ // and the second time with a much larger dynamically allocated buffer.
+ char buffer[500];
+ for (int iter = 0; iter < 2; iter++) {
+ char* base;
+ int bufsize;
+ if (iter == 0) {
+ bufsize = sizeof(buffer);
+ base = buffer;
+ } else {
+ bufsize = 30000;
+ base = new char[bufsize];
+ }
+ char* p = base;
+ char* limit = base + bufsize;
+
+ struct timeval now_tv;
+ gettimeofday(&now_tv, NULL);
+ const time_t seconds = now_tv.tv_sec;
+ struct tm t;
+ localtime_r(&seconds, &t);
+ p += snprintf(p, limit - p,
+ "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
+ t.tm_year + 1900,
+ t.tm_mon + 1,
+ t.tm_mday,
+ t.tm_hour,
+ t.tm_min,
+ t.tm_sec,
+ static_cast<int>(now_tv.tv_usec),
+ static_cast<long long unsigned int>(thread_id));
+
+ // Print the message
+ if (p < limit) {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ p += vsnprintf(p, limit - p, format, backup_ap);
+ va_end(backup_ap);
+ }
+
+ // Truncate to available space if necessary
+ if (p >= limit) {
+ if (iter == 0) {
+ continue; // Try again with larger buffer
+ } else {
+ p = limit - 1;
+ }
+ }
+
+ // Add newline if necessary
+ if (p == base || p[-1] != '\n') {
+ *p++ = '\n';
+ }
+
+ assert(p <= limit);
+ fwrite(base, 1, p - base, file_);
+ fflush(file_);
+ if (base != buffer) {
+ delete[] base;
+ }
+ break;
+ }
+ }
+};
+
+}
+
+#endif // STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_