summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2014-11-04 14:58:47 +0000
committerKim van der Riet <kpvdr@apache.org>2014-11-04 14:58:47 +0000
commit6c7050f15797fdfc1481aab9a2b3a13b71fa79dd (patch)
tree07724964b5d6502c1fe65eb757aa7d9d0a06b5de /qpid/cpp/src
parentfa4710228a7b0649260a602c61e56e30e984d9aa (diff)
downloadqpid-python-6c7050f15797fdfc1481aab9a2b3a13b71fa79dd.tar.gz
QPID-5671: [linearstore] Add ability to use disk partitions and select per-queue EFPs. This is the first part of resolving this issue and changes the journal directory structure to use symlinks instead of moving the actual files. This opens the way to having files from multiple partitions in the same journal. Other small improvements and tidy-ups also included.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636598 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES43
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp11
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp162
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp66
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp62
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h9
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp51
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.h1
16 files changed, 273 insertions, 179 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index da169f57d6..1c50f7e2cb 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -25,31 +25,24 @@ Current/pending:
------ ------- ----------------------
5359 - Linearstore: Implement new management schema and wire into store
5360 - Linearstore: Evaluate and rework logging to produce a consistent log output
- 5361 - Linearstore: No tests for linearstore functionality currently exist
+ 5361 1145359 Linearstore: No tests for linearstore functionality currently exist
svn r.1564893 2014-02-05: Added tx-test-soak.sh
svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh
+ svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore
* No existing tests for linearstore:
** Basic broker-level tests for txn and non-txn recovery
** Store-level tests which check write boundary conditions
** EFP tests, including file recovery, error management
** Unit tests
** Basic performance tests
- 5362 - Linearstore: No store tools exist for examining the journals
- svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
- svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
- svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
- * Store analysis and status
- * Recovery/reading of message content
- * Empty file pool status and management
5464 - [linearstore] Incompletely created journal files accumulate in EFP
- - 1088944 [Linearstore] store does not return all files to EFP after purging big queue
- - 1078937 [linearstore] Installation and tests for new store analysis tool qpid-qls-analyze
- svn r.1596633 2014-05-21: Modified to run from installed location
-* 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation
-* - 1089652 [RFE]: Configuration option for linear store to delete the used journal files instead of recycling them.
+ - 1088944 [Linearstore] store does not return all files to EFP after purging big queue <queue purge issue>
+ 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation
+ - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP
+ - 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue>
+ - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP
+ 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs
+
Fixed/closed:
@@ -118,12 +111,24 @@ NO-JIRA - Added missing Apache copyright/license text
5651 - [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue
svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec
* Bug introduced by r.1578899.
+ 5362 1145363 Linearstore: No store tools exist for examining the journals
+ svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+ svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
+ * Store analysis and status
+ * Recovery/reading of message content
+ * Empty file pool status and management
5661 - [linearstore] Set default cmake build to exclude linearstore
svn r.1584379 2014-04-03 Proposed solution.
* Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build.
5750 1078142 [linearstore] qpidd closes connection with (distributed) transactional client while checking previous transaction, broker signals error (closed by error: Queue Ve0-2: async_dequeue() failed: exception 0x0103 wmgr::get_events() threw JERR__AIO: AIO error)
svn r.1594215 2014-05-13 Proposed solution.
* jexception 0x0103 wmgr::get_events() threw JERR__AIO: AIO error. (AIO write operation failed: Invalid argument (-22) [pg=0 size=8192 offset=4096 fh=22])
+ 5655 1078937 [linearstore] Installation and tests for new store analysis tool qpid-qls-analyze
+ svn r.1596633 2014-05-21: Modified to run from installed location
5767 1098118 [linearstore] broker segfaults when recovering journal file with damaged header
svn r.1596509 2014-05-21 Proposed solution (committed by pmoravec)
svn r.1599243 2014-06-02 Solution to additional case of file header corruption
@@ -134,6 +139,10 @@ NO-JIRA - Added missing Apache copyright/license text
This turned out to be an AMQP error, fix does not affect store code.
6043 1089652 [RFE]: Configuration option for linear store to delete or overwrite the used journal files.
svn r.1620426 2014-08-25 Proposed solution
+ 6147 1152012 [C++ broker linearstore] missing journal id in "trace Mgmt create journal." log
+ svn r.1631360 2014-10-13 Proposed solution
+ 6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP
+ svn r.1632504 2014-10-17 Proposed solution by pmoravec
Ordered checkin list:
@@ -175,6 +184,8 @@ no. svn r Q-JIRA RHBZ Date Alt Committer
30. 1599243 5767 1098118 2014-06-02
31. 1614665 5924 1124906 2014-07-30
32. 1620426 6043 1089652 2014-08-25
+33. 1631360 6147 1152012 2014-10-13 (pmoravec)
+34. 1632504 6157 1150397 2014-10-17 (pmoravec)
See above sections for details on these checkins.
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 8e4c6e38e1..a5dd63da62 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -201,8 +201,7 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
if (truncateFlag_)
truncateInit();
- else
- init();
+ init(truncateFlag_);
QLS_LOG(info, "Store module initialized; store-dir=" << storeDir_);
QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber);
@@ -218,7 +217,7 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
return isInit;
}
-void MessageStoreImpl::init()
+void MessageStoreImpl::init(const bool truncateFlag)
{
const int retryMax = 3;
int bdbRetryCnt = 0;
@@ -296,6 +295,7 @@ void MessageStoreImpl::init()
defaultEfpPartitionNumber,
defaultEfpFileSize_kib,
overwriteBeforeReturnFlag,
+ truncateFlag,
jrnlLog));
efpMgr->findEfpPartitions();
}
@@ -337,13 +337,12 @@ void MessageStoreImpl::truncateInit()
isInit = false;
}
- // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
-
qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir());
+
+ // TODO: Linearstore: harvest all discarded journal files into the empty file pool(s).
qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir());
qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir());
QLS_LOG(info, "Store directory " << getStoreTopLevelDir() << " was truncated.");
- init();
}
void MessageStoreImpl::chkTplStoreInit()
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 51e02ff395..236fcf2cf8 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -157,7 +157,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB,
const std::string& paramName);
- void init();
+ void init(const bool truncateFlag);
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
index a69ab09001..371443f46b 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
@@ -33,37 +33,53 @@
#include <unistd.h>
#include <vector>
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
+// static
+std::string EmptyFilePool::s_inuseFileDirectory_ = "in_use";
+
+// static
+std::string EmptyFilePool::s_returnedFileDirectory_ = "returned";
+
EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
+ const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef) :
efpDirectory_(efpDirectory),
efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())),
partitionPtr_(partitionPtr),
+ overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+ truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{}
EmptyFilePool::~EmptyFilePool() {}
void EmptyFilePool::initialize() {
+//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG
std::vector<std::string> dirList;
+
+ // Process empty files in main dir
jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
if (dotPos != std::string::npos) {
if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
- std::string emptyFile(efpDirectory_ + "/" + (*i));
- if (validateEmptyFile(emptyFile)) {
- pushEmptyFile(emptyFile);
+ std::string emptyFileName(efpDirectory_ + "/" + (*i));
+ if (validateEmptyFile(emptyFileName)) {
+ pushEmptyFile(emptyFileName);
}
}
}
}
+
+ // Create 'in_use' and 'returned' subdirs if they don't already exist
+ // Retern files to EFP in 'in_use' and 'returned' subdirs if they do exist
+ initializeSubDirectory(efpDirectory_ + "/" + s_inuseFileDirectory_);
+ initializeSubDirectory(efpDirectory_ + "/" + s_returnedFileDirectory_);
}
efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
@@ -106,36 +122,29 @@ const efpIdentity_t EmptyFilePool::getIdentity() const {
std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
std::string emptyFileName = popEmptyFile();
- std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+ std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/'));
+ std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (moveFile(emptyFileName, newFileName)) {
// Try again with new UUID for file name
- newFileName = destDirectory + "/" + getEfpFileName();
- if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+ newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName();
+ if (moveFile(emptyFileName, newFileName)) {
+//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
pushEmptyFile(emptyFileName);
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
}
}
- return newFileName;
+ if (createSymLink(newFileName, symlinkName)) {
+ std::ostringstream oss;
+ oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ }
+ return symlinkName;
}
-void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
- std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/'
- if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
- // Try again with new UUID for file name
- emptyFileName = efpDirectory_ + "/" + getEfpFileName();
- if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
- // Failed twice in a row - delete file
- ::unlink(fqSrcFile.c_str());
- return;
- }
- }
- resetEmptyFileHeader(emptyFileName);
- if (partitionPtr_->getOverwriteBeforeReturnFlag()) {
- overwriteFileContents(emptyFileName);
- }
- pushEmptyFile(emptyFileName);
+void EmptyFilePool::returnEmptyFileSymlink(const std::string& emptyFileSymlink) {
+ returnEmptyFile(deleteSymlink(emptyFileSymlink));
}
//static
@@ -173,12 +182,12 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN
// --- protected functions ---
-// WARNING: this method needs to be called under the scope of emptyFileListMutex_ lock
-void EmptyFilePool::createEmptyFile() {
+std::string EmptyFilePool::createEmptyFile() {
std::string efpfn = getEfpFileName();
- if (overwriteFileContents(efpfn)) {
- emptyFileList_.push_back(efpfn);
+ if (!overwriteFileContents(efpfn)) {
+ // TODO: handle failure to prepare new file here
}
+ return efpfn;
}
std::string EmptyFilePool::getEfpFileName() {
@@ -188,6 +197,27 @@ std::string EmptyFilePool::getEfpFileName() {
return oss.str();
}
+void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
+ std::vector<std::string> dirList;
+ if (jdir::exists(fqDirName)) {
+ if (truncateFlag_) {
+ jdir::read_dir(fqDirName, dirList, false, true, false, false);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ size_t dotPos = i->rfind(".");
+ if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
+ returnEmptyFile(fqDirName + "/" + (*i));
+ } else {
+ std::ostringstream oss;
+ oss << "File \'" << *i << "\' was not a journal file and was not returned to EFP.";
+ journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+ }
+ }
+ }
+ } else {
+ jdir::create_dir(fqDirName);
+ }
+}
+
bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
::file_hdr_t fh;
::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
@@ -199,22 +229,27 @@ bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
ofs.put('\0');
ofs.close();
return true;
-//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
} else {
-//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\"" << std::endl; // DEBUG
}
return false;
}
std::string EmptyFilePool::popEmptyFile() {
std::string emptyFileName;
+ bool listEmptyFlag;
{
slock l(emptyFileListMutex_);
- if (emptyFileList_.empty()) {
- createEmptyFile();
+ listEmptyFlag = emptyFileList_.empty();
+ if (!listEmptyFlag) {
+ emptyFileName = emptyFileList_.front();
+ emptyFileList_.pop_front();
}
- emptyFileName = emptyFileList_.front();
- emptyFileList_.pop_front();
+ }
+ // If the list is empty, create a new file and return the file name.
+ if (listEmptyFlag) {
+ emptyFileName = createEmptyFile();
}
return emptyFileName;
}
@@ -224,6 +259,28 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
emptyFileList_.push_back(fqFileName);
}
+void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
+ std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (moveFile(emptyFileName, returnedFileName)) {
+ ::unlink(emptyFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG
+ }
+
+ // TODO: On a separate thread, process returned files by overwriting headers and, optionally, their contents and
+ // returning them to the EFP directory
+ resetEmptyFileHeader(returnedFileName);
+ if (overwriteBeforeReturnFlag_) {
+ overwriteFileContents(returnedFileName);
+ }
+ std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+ ::unlink(returnedFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
+ } else {
+ pushEmptyFile(sanitizedEmptyFileName);
+ }
+}
+
void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
if (fs.good()) {
@@ -239,14 +296,14 @@ void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
fs.write(buff, buffsize);
std::streampos bytesWritten = fs.tellp();
if (std::streamoff(bytesWritten) != buffsize) {
-//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to write file header of file \"" << fqFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl; // DEBUG
}
} else {
-//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to read file header of file \"" << fqFileName << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl; // DEBUG
}
fs.close();
} else {
-//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for reading" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\" for reading" << std::endl; // DEBUG
}
}
@@ -329,7 +386,7 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
}
// static
-int EmptyFilePool::moveEmptyFile(const std::string& from,
+int EmptyFilePool::moveFile(const std::string& from,
const std::string& to) {
if (::rename(from.c_str(), to.c_str())) {
if (errno == EEXIST) return errno; // File name exists
@@ -340,4 +397,29 @@ int EmptyFilePool::moveEmptyFile(const std::string& from,
return 0;
}
+//static
+int EmptyFilePool::createSymLink(const std::string& fqFileName,
+ const std::string& fqLinkName) {
+ if(::symlink(fqFileName.c_str(), fqLinkName.c_str())) {
+ if (errno == EEXIST) return errno; // File name exists
+ std::ostringstream oss;
+ oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
+ }
+ return 0;
+}
+
+//static
+std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
+ char buff[1024];
+ ssize_t len = ::readlink(fqLinkName.c_str(), buff, 1024);
+ if (len < 0) {
+ std::ostringstream oss;
+ oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
+ }
+ ::unlink(fqLinkName.c_str());
+ return std::string(buff, len);
+}
+
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
index 3a2c93f769..dbc3992f46 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
@@ -44,11 +44,16 @@ class EmptyFilePool
{
protected:
typedef std::deque<std::string> emptyFileList_t;
- typedef emptyFileList_t::iterator emptyFileListItr_t;
+ typedef emptyFileList_t::const_iterator emptyFileListConstItr_t;
+
+ static std::string s_inuseFileDirectory_;
+ static std::string s_returnedFileDirectory_;
const std::string efpDirectory_;
const efpDataSize_kib_t efpDataSize_kib_;
const EmptyFilePoolPartition* partitionPtr_;
+ const bool overwriteBeforeReturnFlag_;
+ const bool truncateFlag_;
JournalLog& journalLogRef_;
private:
@@ -58,6 +63,8 @@ private:
public:
EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
+ const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef);
virtual ~EmptyFilePool();
@@ -73,23 +80,28 @@ public:
const efpIdentity_t getIdentity() const;
std::string takeEmptyFile(const std::string& destDirectory);
- void returnEmptyFile(const std::string& srcFile);
+ void returnEmptyFileSymlink(const std::string& emptyFileSymlink);
static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib);
static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName,
const efpPartitionNumber_t partitionNumber);
protected:
- void createEmptyFile();
+ std::string createEmptyFile();
std::string getEfpFileName();
- std::string popEmptyFile();
+ void initializeSubDirectory(const std::string& fqDirName);
bool overwriteFileContents(const std::string& fqFileName);
+ std::string popEmptyFile();
void pushEmptyFile(const std::string fqFileName);
+ void returnEmptyFile(const std::string& emptyFileName);
void resetEmptyFileHeader(const std::string& fqFileName);
bool validateEmptyFile(const std::string& emptyFileName) const;
- static int moveEmptyFile(const std::string& fromFqPath,
- const std::string& toFqPath);
+ static int moveFile(const std::string& fromFqPath,
+ const std::string& toFqPath);
+ static int createSymLink(const std::string& fqFileName,
+ const std::string& fqLinkName);
+ static std::string deleteSymlink(const std::string& fqLinkName);
};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
index e5fd44bd2d..28e1b0b56e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
@@ -27,8 +27,6 @@
#include "qpid/linearstore/journal/JournalLog.h"
#include "qpid/linearstore/journal/slock.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -37,11 +35,13 @@ EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
const efpPartitionNumber_t defaultPartitionNumber,
const efpDataSize_kib_t defaultEfpDataSize_kib,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef) :
qlsStorePath_(qlsStorePath),
defaultPartitionNumber_(defaultPartitionNumber),
defaultEfpDataSize_kib_(defaultEfpDataSize_kib),
overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+ truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{}
@@ -63,20 +63,7 @@ void EmptyFilePoolManager::findEfpPartitions() {
efpPartitionNumber_t pn = EmptyFilePoolPartition::getPartitionNumber(*i);
if (pn > 0) { // valid partition name found
std::string fullDirPath(qlsStorePath_ + "/" + (*i));
- EmptyFilePoolPartition* efppp = 0;
- try {
- efppp = new EmptyFilePoolPartition(pn, fullDirPath, overwriteBeforeReturnFlag_, journalLogRef_);
- {
- slock l(partitionMapMutex_);
- partitionMap_[pn] = efppp;
- }
- } catch (const std::exception& e) {
- if (efppp != 0) {
- delete efppp;
- efppp = 0;
- }
-//std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; // DEBUG
- }
+ EmptyFilePoolPartition* efppp = insertPartition(pn, fullDirPath);
if (efppp != 0)
efppp->findEmptyFilePools();
foundPartition = true;
@@ -85,37 +72,28 @@ void EmptyFilePoolManager::findEfpPartitions() {
// If no partition was found, create an empty default partition.
if (!foundPartition) {
- journalLogRef_.log(JournalLog::LOG_INFO, "No EFP partition found, creating an empty partition.");
- std::ostringstream oss;
- oss << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
- << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
- jdir::create_dir(oss.str());
+ std::ostringstream oss1;
+ oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
+ jdir::create_dir(oss1.str());
+ insertPartition(defaultPartitionNumber_, oss1.str());
+ std::ostringstream oss2;
+ oss2 << "No EFP partition found, creating an empty partition at " << oss1.str();
+ journalLogRef_.log(JournalLog::LOG_INFO, oss2.str());
}
}
journalLogRef_.log(JournalLog::LOG_INFO, "EFP Manager initialization complete");
std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*> partitionList;
- std::vector<qpid::linearstore::journal::EmptyFilePool*> filePoolList;
getEfpPartitions(partitionList);
if (partitionList.size() == 0) {
journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible.");
} else {
std::stringstream oss;
- oss << "> EFP Partitions found: " << partitionList.size();
+ oss << "EFP Partitions found: " << partitionList.size();
journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
for (std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
- filePoolList.clear();
- (*i)->getEmptyFilePools(filePoolList);
- std::stringstream oss;
- oss << " * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size()
- << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'";
- journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
- for (std::vector<qpid::linearstore::journal::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
- std::ostringstream oss;
- oss << " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
- " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
- journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
- }
+ journalLogRef_.log(JournalLog::LOG_INFO, (*i)->toString(5U));
}
}
}
@@ -210,4 +188,22 @@ uint16_t EmptyFilePoolManager::getNumEfpPartitions() const {
return partitionMap_.size();
}
+EmptyFilePoolPartition* EmptyFilePoolManager::insertPartition(const efpPartitionNumber_t pn, const std::string& fullPartitionPath) {
+ EmptyFilePoolPartition* efppp = 0;
+ try {
+ efppp = new EmptyFilePoolPartition(pn, fullPartitionPath, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+ {
+ slock l(partitionMapMutex_);
+ partitionMap_[pn] = efppp;
+ }
+ } catch (const std::exception& e) {
+ if (efppp != 0) {
+ delete efppp;
+ efppp = 0;
+ }
+//std::cerr << "*** Unable to initialize partition " << pn << " (\'" << fullPartitionPath << "\'): " << e.what() << std::endl; // DEBUG
+ }
+ return efppp;
+}
+
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
index d7d08dcd25..d0aa7fa7d6 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
@@ -46,6 +46,7 @@ protected:
const efpPartitionNumber_t defaultPartitionNumber_;
const efpDataSize_kib_t defaultEfpDataSize_kib_;
const bool overwriteBeforeReturnFlag_;
+ const bool truncateFlag_;
JournalLog& journalLogRef_;
partitionMap_t partitionMap_;
smutex partitionMapMutex_;
@@ -55,6 +56,7 @@ public:
const efpPartitionNumber_t defaultPartitionNumber,
const efpDataSize_kib_t defaultEfpDataSize_kib,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef_);
virtual ~EmptyFilePoolManager();
@@ -72,6 +74,8 @@ public:
void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
const efpPartitionNumber_t efpPartitionNumber = 0);
uint16_t getNumEfpPartitions() const;
+protected:
+ EmptyFilePoolPartition* insertPartition(const efpPartitionNumber_t pn, const std::string& fullPartitionPath);
};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
index f80268b0e6..c10caebc6f 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
@@ -26,22 +26,19 @@
#include "qpid/linearstore/journal/jdir.h"
#include "qpid/linearstore/journal/slock.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
-// static
-const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
-
EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
const std::string& partitionDir,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef) :
partitionNum_(partitionNum),
partitionDir_(partitionDir),
overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+ truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{
validatePartitionDir();
@@ -57,25 +54,13 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() {
void
EmptyFilePoolPartition::findEmptyFilePools() {
-//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+//std::cout << "*** Reading " << partitionDir_ << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(partitionDir_, dirList, true, false, false, false);
- bool foundEfpDir = false;
- for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- if (i->compare(s_efpTopLevelDir_) == 0) {
- foundEfpDir = true;
- break;
- }
- }
- if (foundEfpDir) {
- std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
-//std::cout << "Reading " << efpDir << std::endl; // DEBUG
- dirList.clear();
- jdir::read_dir(efpDir, dirList, true, false, false, true);
+ jdir::read_dir(partitionDir_, dirList, true, false, false, true);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
EmptyFilePool* efpp = 0;
try {
- efpp = new EmptyFilePool(*i, this, journalLogRef_);
+ efpp = new EmptyFilePool(*i, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
{
slock l(efpMapMutex_);
efpMap_[efpp->dataSize_kib()] = efpp;
@@ -88,13 +73,14 @@ EmptyFilePoolPartition::findEmptyFilePools() {
}
//std::cerr << "WARNING: " << e.what() << std::endl;
}
- if (efpp != 0)
+ if (efpp != 0) {
efpp->initialize();
+ }
}
- }
}
EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+ slock l(efpMapMutex_);
efpMapItr_t i = efpMap_.find(efpDataSize_kib);
if (i == efpMap_.end())
return 0;
@@ -102,21 +88,19 @@ EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t
}
void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
+ slock l(efpMapMutex_);
for (efpMapItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
efpList.push_back(i->second);
}
}
void EmptyFilePoolPartition::getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList_kib) const {
+ slock l(efpMapMutex_);
for (efpMapConstItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
efpDataSizesList_kib.push_back(i->first);
}
}
-bool EmptyFilePoolPartition::getOverwriteBeforeReturnFlag() const {
- return overwriteBeforeReturnFlag_;
-}
-
std::string EmptyFilePoolPartition::getPartitionDirectory() const {
return partitionDir_;
}
@@ -125,6 +109,32 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber() const {
return partitionNum_;
}
+std::string EmptyFilePoolPartition::toString(const uint16_t indent) const {
+ std::string indentStr(indent, ' ');
+ std::stringstream oss;
+ oss << "EFP Partition " << partitionNum_ << ":" << std::endl;
+ oss << indentStr << "EFP Partition Analysis (partition " << partitionNum_ << " at \"" << partitionDir_ << "\"):" << std::endl;
+ if (efpMap_.empty()) {
+ oss << indentStr << "<Partition empty, no EFPs found>" << std::endl;
+ } else {
+ oss << indentStr << std::setw(12) << "efp_size_kib"
+ << std::setw(12) << "num_files"
+ << std::setw(18) << "tot_capacity_kib" << std::endl;
+ oss << indentStr << std::setw(12) << "------------"
+ << std::setw(12) << "----------"
+ << std::setw(18) << "----------------" << std::endl;
+ {
+ slock l(efpMapMutex_);
+ for (efpMapConstItr_t i=efpMap_.begin(); i!= efpMap_.end(); ++i) {
+ oss << indentStr << std::setw(12) << i->first
+ << std::setw(12) << i->second->numEmptyFiles()
+ << std::setw(18) << i->second->cumFileSize_kib() << std::endl;
+ }
+ }
+ }
+ return oss.str();
+}
+
// static
std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNumber_t partitionNumber) {
std::ostringstream oss;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
index 7eb9f70d52..c653c6be6a 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
@@ -37,8 +37,6 @@ class JournalLog;
class EmptyFilePoolPartition
{
-public:
- static const std::string s_efpTopLevelDir_;
protected:
typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
typedef efpMap_t::iterator efpMapItr_t;
@@ -47,6 +45,7 @@ protected:
const efpPartitionNumber_t partitionNum_;
const std::string partitionDir_;
const bool overwriteBeforeReturnFlag_;
+ const bool truncateFlag_;
JournalLog& journalLogRef_;
efpMap_t efpMap_;
smutex efpMapMutex_;
@@ -55,6 +54,7 @@ public:
EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
const std::string& partitionDir,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef);
virtual ~EmptyFilePoolPartition();
@@ -62,9 +62,9 @@ public:
EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
- bool getOverwriteBeforeReturnFlag() const;
std::string getPartitionDirectory() const;
efpPartitionNumber_t getPartitionNumber() const;
+ std::string toString(const uint16_t indent) const;
static std::string getPartionDirectoryName(const efpPartitionNumber_t partitionNumber);
static efpPartitionNumber_t getPartitionNumber(const std::string& name);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
index 14213f7955..4cae4e6538 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
@@ -23,6 +23,7 @@
#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_
#include <iostream>
+#include <sstream>
#include <stdint.h>
namespace qpid {
@@ -42,7 +43,13 @@ typedef struct efpIdentity_t {
efpIdentity_t() : pn_(0), ds_(0) {}
efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {}
efpIdentity_t(const efpIdentity_t& ei) : pn_(ei.pn_), ds_(ei.ds_) {}
- friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
+ friend std::ostream& operator<<(std::ostream& os, const efpIdentity_t& id) {
+ // This two-stage write allows this << operator to be used with std::setw() for formatted writes
+ std::ostringstream oss;
+ oss << id.pn_ << "," << id.ds_;
+ os << oss.str();
+ return os;
+ }
} efpIdentity_t;
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index 8d43aff8c5..e6e07dc8e2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -96,7 +96,7 @@ uint64_t LinearFileController::getNextRecordId() {
void LinearFileController::removeFileToEfp(const std::string& fileName) {
if (emptyFilePoolPtr_) {
- emptyFilePoolPtr_->returnEmptyFile(fileName);
+ emptyFilePoolPtr_->returnEmptyFileSymlink(fileName);
}
}
@@ -108,7 +108,7 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) {
void LinearFileController::purgeEmptyFilesToEfp() {
slock l(journalFileListMutex_);
while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ emptyFilePoolPtr_->returnEmptyFileSymlink(journalFileList_.front()->getFqFileName());
delete journalFileList_.front();
journalFileList_.pop_front();
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 47aa2f634e..3f39913422 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -322,36 +322,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
}
}
-std::string RecoveryManager::toString(const std::string& jid) {
- std::ostringstream oss;
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
- oss << " Journal File List:" << std::endl;
- for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
- oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
- }
- oss << " Enqueue Counts: [ ";
- for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
- if (l != fileNumberMap_.begin()) {
- oss << ", ";
- }
- oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
- }
- oss << " ]" << std::endl;
- oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
- std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
- (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
- oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
- oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " Enqueued records (txn & non-txn):" << std::endl;
- return oss.str();
-}
-
-std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+std::string RecoveryManager::toString(const std::string& jid, const uint16_t indent) const {
std::string indentStr(indent, ' ');
std::ostringstream oss;
oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
@@ -360,18 +331,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
} else {
oss << indentStr << std::setw(7) << "file_id"
<< std::setw(43) << "file_name"
- << std::setw(16) << "fro"
<< std::setw(12) << "record_cnt"
- << std::setw(5) << "ptn"
- << std::setw(10) << "efp"
+ << std::setw(16) << "fro"
+ << std::setw(12) << "efp_id"
<< std::endl;
oss << indentStr << std::setw(7) << "-------"
<< std::setw(43) << "-----------------------------------------"
+ << std::setw(12) << "----------"
<< std::setw(16) << "--------------"
<< std::setw(12) << "----------"
- << std::setw(5) << "---"
- << std::setw(10) << "--------"
<< std::endl;
+ uint32_t totalRecordCount(0UL);
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
std::ostringstream fid;
@@ -380,19 +350,20 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
oss << indentStr << std::setw(7) << fid.str()
<< std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
- << std::setw(16) << fro.str()
<< std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
- << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
- << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
+ << std::setw(16) << fro.str()
+ << std::setw(12) << k->second->journalFilePtr_->getEfpIdentity()
<< std::endl;
+ totalRecordCount += k->second->journalFilePtr_->getEnqueuedRecordCount();
}
+ oss << indentStr << std::setw(62) << "----------" << std::endl;
+ oss << indentStr << std::setw(62) << totalRecordCount << std::endl;
oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" <<
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
}
return oss.str();
}
@@ -942,7 +913,7 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
- emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+ emptyFilePoolPtr->returnEmptyFileSymlink(rfdp->journalFilePtr_->getFqFileName());
delete rfdp->journalFilePtr_;
delete rfdp;
fileNumberMap_.erase(fileNumberMap_.begin()->first);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
index eeda4630f3..55cc6f8329 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
@@ -125,8 +125,7 @@ public:
void recoveryComplete();
void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr);
- std::string toString(const std::string& jid);
- std::string toLog(const std::string& jid, const int indent);
+ std::string toString(const std::string& jid, const uint16_t indent) const;
protected:
void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
void checkFileStreamOk(bool checkEof);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
index 902b22dc71..cc31f2e1df 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
@@ -119,7 +119,7 @@ jcntl::recover(EmptyFilePoolManager* efpmp,
assert(_emptyFilePoolPtr != 0);
highest_rid = _recoveryManager.getHighestRecordId();
- _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
+ _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, 5U));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
_recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
if (_recoveryManager.isLastFileFull()) {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 81f96677f4..e1602cf961 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -112,6 +112,7 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02;
const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03;
const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04;
const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05;
+const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06;
// Negative returns for some functions
const int32_t jerrno::AIO_TIMEOUT = -1;
@@ -206,6 +207,7 @@ jerrno::__init()
_err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
_err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
_err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
+ _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed";
//_err_map[] = "";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
index e5cd7e8227..36410f9844 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
@@ -130,6 +130,7 @@ namespace journal {
static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
+ static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed
// Negative returns for some functions
static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return