summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES43
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp11
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp162
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h24
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp66
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp62
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h9
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp4
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp51
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h3
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp2
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.h1
16 files changed, 273 insertions, 179 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index da169f57d6..1c50f7e2cb 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -25,31 +25,24 @@ Current/pending:
------ ------- ----------------------
5359 - Linearstore: Implement new management schema and wire into store
5360 - Linearstore: Evaluate and rework logging to produce a consistent log output
- 5361 - Linearstore: No tests for linearstore functionality currently exist
+ 5361 1145359 Linearstore: No tests for linearstore functionality currently exist
svn r.1564893 2014-02-05: Added tx-test-soak.sh
svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh
+ svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore
* No existing tests for linearstore:
** Basic broker-level tests for txn and non-txn recovery
** Store-level tests which check write boundary conditions
** EFP tests, including file recovery, error management
** Unit tests
** Basic performance tests
- 5362 - Linearstore: No store tools exist for examining the journals
- svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
- svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
- svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
- * Store analysis and status
- * Recovery/reading of message content
- * Empty file pool status and management
5464 - [linearstore] Incompletely created journal files accumulate in EFP
- - 1088944 [Linearstore] store does not return all files to EFP after purging big queue
- - 1078937 [linearstore] Installation and tests for new store analysis tool qpid-qls-analyze
- svn r.1596633 2014-05-21: Modified to run from installed location
-* 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation
-* - 1089652 [RFE]: Configuration option for linear store to delete the used journal files instead of recycling them.
+ - 1088944 [Linearstore] store does not return all files to EFP after purging big queue <queue purge issue>
+ 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation
+ - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP
+ - 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue>
+ - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP
+ 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs
+
Fixed/closed:
@@ -118,12 +111,24 @@ NO-JIRA - Added missing Apache copyright/license text
5651 - [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue
svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec
* Bug introduced by r.1578899.
+ 5362 1145363 Linearstore: No store tools exist for examining the journals
+ svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+ svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
+ * Store analysis and status
+ * Recovery/reading of message content
+ * Empty file pool status and management
5661 - [linearstore] Set default cmake build to exclude linearstore
svn r.1584379 2014-04-03 Proposed solution.
* Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build.
5750 1078142 [linearstore] qpidd closes connection with (distributed) transactional client while checking previous transaction, broker signals error (closed by error: Queue Ve0-2: async_dequeue() failed: exception 0x0103 wmgr::get_events() threw JERR__AIO: AIO error)
svn r.1594215 2014-05-13 Proposed solution.
* jexception 0x0103 wmgr::get_events() threw JERR__AIO: AIO error. (AIO write operation failed: Invalid argument (-22) [pg=0 size=8192 offset=4096 fh=22])
+ 5655 1078937 [linearstore] Installation and tests for new store analysis tool qpid-qls-analyze
+ svn r.1596633 2014-05-21: Modified to run from installed location
5767 1098118 [linearstore] broker segfaults when recovering journal file with damaged header
svn r.1596509 2014-05-21 Proposed solution (committed by pmoravec)
svn r.1599243 2014-06-02 Solution to additional case of file header corruption
@@ -134,6 +139,10 @@ NO-JIRA - Added missing Apache copyright/license text
This turned out to be an AMQP error, fix does not affect store code.
6043 1089652 [RFE]: Configuration option for linear store to delete or overwrite the used journal files.
svn r.1620426 2014-08-25 Proposed solution
+ 6147 1152012 [C++ broker linearstore] missing journal id in "trace Mgmt create journal." log
+ svn r.1631360 2014-10-13 Proposed solution
+ 6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP
+ svn r.1632504 2014-10-17 Proposed solution by pmoravec
Ordered checkin list:
@@ -175,6 +184,8 @@ no. svn r Q-JIRA RHBZ Date Alt Committer
30. 1599243 5767 1098118 2014-06-02
31. 1614665 5924 1124906 2014-07-30
32. 1620426 6043 1089652 2014-08-25
+33. 1631360 6147 1152012 2014-10-13 (pmoravec)
+34. 1632504 6157 1150397 2014-10-17 (pmoravec)
See above sections for details on these checkins.
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 8e4c6e38e1..a5dd63da62 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -201,8 +201,7 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
if (truncateFlag_)
truncateInit();
- else
- init();
+ init(truncateFlag_);
QLS_LOG(info, "Store module initialized; store-dir=" << storeDir_);
QLS_LOG(info, "> Default EFP partition: " << defaultEfpPartitionNumber);
@@ -218,7 +217,7 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
return isInit;
}
-void MessageStoreImpl::init()
+void MessageStoreImpl::init(const bool truncateFlag)
{
const int retryMax = 3;
int bdbRetryCnt = 0;
@@ -296,6 +295,7 @@ void MessageStoreImpl::init()
defaultEfpPartitionNumber,
defaultEfpFileSize_kib,
overwriteBeforeReturnFlag,
+ truncateFlag,
jrnlLog));
efpMgr->findEfpPartitions();
}
@@ -337,13 +337,12 @@ void MessageStoreImpl::truncateInit()
isInit = false;
}
- // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
-
qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir());
+
+ // TODO: Linearstore: harvest all discarded journal files into the empty file pool(s).
qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir());
qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir());
QLS_LOG(info, "Store directory " << getStoreTopLevelDir() << " was truncated.");
- init();
}
void MessageStoreImpl::chkTplStoreInit()
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 51e02ff395..236fcf2cf8 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -157,7 +157,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB,
const std::string& paramName);
- void init();
+ void init(const bool truncateFlag);
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
index a69ab09001..371443f46b 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
@@ -33,37 +33,53 @@
#include <unistd.h>
#include <vector>
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
+// static
+std::string EmptyFilePool::s_inuseFileDirectory_ = "in_use";
+
+// static
+std::string EmptyFilePool::s_returnedFileDirectory_ = "returned";
+
EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
+ const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef) :
efpDirectory_(efpDirectory),
efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())),
partitionPtr_(partitionPtr),
+ overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+ truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{}
EmptyFilePool::~EmptyFilePool() {}
void EmptyFilePool::initialize() {
+//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG
std::vector<std::string> dirList;
+
+ // Process empty files in main dir
jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
if (dotPos != std::string::npos) {
if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
- std::string emptyFile(efpDirectory_ + "/" + (*i));
- if (validateEmptyFile(emptyFile)) {
- pushEmptyFile(emptyFile);
+ std::string emptyFileName(efpDirectory_ + "/" + (*i));
+ if (validateEmptyFile(emptyFileName)) {
+ pushEmptyFile(emptyFileName);
}
}
}
}
+
+ // Create 'in_use' and 'returned' subdirs if they don't already exist
+ // Retern files to EFP in 'in_use' and 'returned' subdirs if they do exist
+ initializeSubDirectory(efpDirectory_ + "/" + s_inuseFileDirectory_);
+ initializeSubDirectory(efpDirectory_ + "/" + s_returnedFileDirectory_);
}
efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
@@ -106,36 +122,29 @@ const efpIdentity_t EmptyFilePool::getIdentity() const {
std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
std::string emptyFileName = popEmptyFile();
- std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+ std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/'));
+ std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (moveFile(emptyFileName, newFileName)) {
// Try again with new UUID for file name
- newFileName = destDirectory + "/" + getEfpFileName();
- if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+ newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName();
+ if (moveFile(emptyFileName, newFileName)) {
+//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
pushEmptyFile(emptyFileName);
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
}
}
- return newFileName;
+ if (createSymLink(newFileName, symlinkName)) {
+ std::ostringstream oss;
+ oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ }
+ return symlinkName;
}
-void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
- std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/'
- if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
- // Try again with new UUID for file name
- emptyFileName = efpDirectory_ + "/" + getEfpFileName();
- if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
- // Failed twice in a row - delete file
- ::unlink(fqSrcFile.c_str());
- return;
- }
- }
- resetEmptyFileHeader(emptyFileName);
- if (partitionPtr_->getOverwriteBeforeReturnFlag()) {
- overwriteFileContents(emptyFileName);
- }
- pushEmptyFile(emptyFileName);
+void EmptyFilePool::returnEmptyFileSymlink(const std::string& emptyFileSymlink) {
+ returnEmptyFile(deleteSymlink(emptyFileSymlink));
}
//static
@@ -173,12 +182,12 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN
// --- protected functions ---
-// WARNING: this method needs to be called under the scope of emptyFileListMutex_ lock
-void EmptyFilePool::createEmptyFile() {
+std::string EmptyFilePool::createEmptyFile() {
std::string efpfn = getEfpFileName();
- if (overwriteFileContents(efpfn)) {
- emptyFileList_.push_back(efpfn);
+ if (!overwriteFileContents(efpfn)) {
+ // TODO: handle failure to prepare new file here
}
+ return efpfn;
}
std::string EmptyFilePool::getEfpFileName() {
@@ -188,6 +197,27 @@ std::string EmptyFilePool::getEfpFileName() {
return oss.str();
}
+void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
+ std::vector<std::string> dirList;
+ if (jdir::exists(fqDirName)) {
+ if (truncateFlag_) {
+ jdir::read_dir(fqDirName, dirList, false, true, false, false);
+ for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+ size_t dotPos = i->rfind(".");
+ if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
+ returnEmptyFile(fqDirName + "/" + (*i));
+ } else {
+ std::ostringstream oss;
+ oss << "File \'" << *i << "\' was not a journal file and was not returned to EFP.";
+ journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+ }
+ }
+ }
+ } else {
+ jdir::create_dir(fqDirName);
+ }
+}
+
bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
::file_hdr_t fh;
::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
@@ -199,22 +229,27 @@ bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
ofs.put('\0');
ofs.close();
return true;
-//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
} else {
-//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\"" << std::endl; // DEBUG
}
return false;
}
std::string EmptyFilePool::popEmptyFile() {
std::string emptyFileName;
+ bool listEmptyFlag;
{
slock l(emptyFileListMutex_);
- if (emptyFileList_.empty()) {
- createEmptyFile();
+ listEmptyFlag = emptyFileList_.empty();
+ if (!listEmptyFlag) {
+ emptyFileName = emptyFileList_.front();
+ emptyFileList_.pop_front();
}
- emptyFileName = emptyFileList_.front();
- emptyFileList_.pop_front();
+ }
+ // If the list is empty, create a new file and return the file name.
+ if (listEmptyFlag) {
+ emptyFileName = createEmptyFile();
}
return emptyFileName;
}
@@ -224,6 +259,28 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
emptyFileList_.push_back(fqFileName);
}
+void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
+ std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (moveFile(emptyFileName, returnedFileName)) {
+ ::unlink(emptyFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG
+ }
+
+ // TODO: On a separate thread, process returned files by overwriting headers and, optionally, their contents and
+ // returning them to the EFP directory
+ resetEmptyFileHeader(returnedFileName);
+ if (overwriteBeforeReturnFlag_) {
+ overwriteFileContents(returnedFileName);
+ }
+ std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/'
+ if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+ ::unlink(returnedFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
+ } else {
+ pushEmptyFile(sanitizedEmptyFileName);
+ }
+}
+
void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
if (fs.good()) {
@@ -239,14 +296,14 @@ void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
fs.write(buff, buffsize);
std::streampos bytesWritten = fs.tellp();
if (std::streamoff(bytesWritten) != buffsize) {
-//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to write file header of file \"" << fqFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl; // DEBUG
}
} else {
-//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to read file header of file \"" << fqFileName << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl; // DEBUG
}
fs.close();
} else {
-//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for reading" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\" for reading" << std::endl; // DEBUG
}
}
@@ -329,7 +386,7 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
}
// static
-int EmptyFilePool::moveEmptyFile(const std::string& from,
+int EmptyFilePool::moveFile(const std::string& from,
const std::string& to) {
if (::rename(from.c_str(), to.c_str())) {
if (errno == EEXIST) return errno; // File name exists
@@ -340,4 +397,29 @@ int EmptyFilePool::moveEmptyFile(const std::string& from,
return 0;
}
+//static
+int EmptyFilePool::createSymLink(const std::string& fqFileName,
+ const std::string& fqLinkName) {
+ if(::symlink(fqFileName.c_str(), fqLinkName.c_str())) {
+ if (errno == EEXIST) return errno; // File name exists
+ std::ostringstream oss;
+ oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
+ }
+ return 0;
+}
+
+//static
+std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
+ char buff[1024];
+ ssize_t len = ::readlink(fqLinkName.c_str(), buff, 1024);
+ if (len < 0) {
+ std::ostringstream oss;
+ oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
+ }
+ ::unlink(fqLinkName.c_str());
+ return std::string(buff, len);
+}
+
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
index 3a2c93f769..dbc3992f46 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
@@ -44,11 +44,16 @@ class EmptyFilePool
{
protected:
typedef std::deque<std::string> emptyFileList_t;
- typedef emptyFileList_t::iterator emptyFileListItr_t;
+ typedef emptyFileList_t::const_iterator emptyFileListConstItr_t;
+
+ static std::string s_inuseFileDirectory_;
+ static std::string s_returnedFileDirectory_;
const std::string efpDirectory_;
const efpDataSize_kib_t efpDataSize_kib_;
const EmptyFilePoolPartition* partitionPtr_;
+ const bool overwriteBeforeReturnFlag_;
+ const bool truncateFlag_;
JournalLog& journalLogRef_;
private:
@@ -58,6 +63,8 @@ private:
public:
EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
+ const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef);
virtual ~EmptyFilePool();
@@ -73,23 +80,28 @@ public:
const efpIdentity_t getIdentity() const;
std::string takeEmptyFile(const std::string& destDirectory);
- void returnEmptyFile(const std::string& srcFile);
+ void returnEmptyFileSymlink(const std::string& emptyFileSymlink);
static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib);
static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName,
const efpPartitionNumber_t partitionNumber);
protected:
- void createEmptyFile();
+ std::string createEmptyFile();
std::string getEfpFileName();
- std::string popEmptyFile();
+ void initializeSubDirectory(const std::string& fqDirName);
bool overwriteFileContents(const std::string& fqFileName);
+ std::string popEmptyFile();
void pushEmptyFile(const std::string fqFileName);
+ void returnEmptyFile(const std::string& emptyFileName);
void resetEmptyFileHeader(const std::string& fqFileName);
bool validateEmptyFile(const std::string& emptyFileName) const;
- static int moveEmptyFile(const std::string& fromFqPath,
- const std::string& toFqPath);
+ static int moveFile(const std::string& fromFqPath,
+ const std::string& toFqPath);
+ static int createSymLink(const std::string& fqFileName,
+ const std::string& fqLinkName);
+ static std::string deleteSymlink(const std::string& fqLinkName);
};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
index e5fd44bd2d..28e1b0b56e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
@@ -27,8 +27,6 @@
#include "qpid/linearstore/journal/JournalLog.h"
#include "qpid/linearstore/journal/slock.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
@@ -37,11 +35,13 @@ EmptyFilePoolManager::EmptyFilePoolManager(const std::string& qlsStorePath,
const efpPartitionNumber_t defaultPartitionNumber,
const efpDataSize_kib_t defaultEfpDataSize_kib,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef) :
qlsStorePath_(qlsStorePath),
defaultPartitionNumber_(defaultPartitionNumber),
defaultEfpDataSize_kib_(defaultEfpDataSize_kib),
overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+ truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{}
@@ -63,20 +63,7 @@ void EmptyFilePoolManager::findEfpPartitions() {
efpPartitionNumber_t pn = EmptyFilePoolPartition::getPartitionNumber(*i);
if (pn > 0) { // valid partition name found
std::string fullDirPath(qlsStorePath_ + "/" + (*i));
- EmptyFilePoolPartition* efppp = 0;
- try {
- efppp = new EmptyFilePoolPartition(pn, fullDirPath, overwriteBeforeReturnFlag_, journalLogRef_);
- {
- slock l(partitionMapMutex_);
- partitionMap_[pn] = efppp;
- }
- } catch (const std::exception& e) {
- if (efppp != 0) {
- delete efppp;
- efppp = 0;
- }
-//std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; // DEBUG
- }
+ EmptyFilePoolPartition* efppp = insertPartition(pn, fullDirPath);
if (efppp != 0)
efppp->findEmptyFilePools();
foundPartition = true;
@@ -85,37 +72,28 @@ void EmptyFilePoolManager::findEfpPartitions() {
// If no partition was found, create an empty default partition.
if (!foundPartition) {
- journalLogRef_.log(JournalLog::LOG_INFO, "No EFP partition found, creating an empty partition.");
- std::ostringstream oss;
- oss << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
- << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
- jdir::create_dir(oss.str());
+ std::ostringstream oss1;
+ oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
+ jdir::create_dir(oss1.str());
+ insertPartition(defaultPartitionNumber_, oss1.str());
+ std::ostringstream oss2;
+ oss2 << "No EFP partition found, creating an empty partition at " << oss1.str();
+ journalLogRef_.log(JournalLog::LOG_INFO, oss2.str());
}
}
journalLogRef_.log(JournalLog::LOG_INFO, "EFP Manager initialization complete");
std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*> partitionList;
- std::vector<qpid::linearstore::journal::EmptyFilePool*> filePoolList;
getEfpPartitions(partitionList);
if (partitionList.size() == 0) {
journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible.");
} else {
std::stringstream oss;
- oss << "> EFP Partitions found: " << partitionList.size();
+ oss << "EFP Partitions found: " << partitionList.size();
journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
for (std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
- filePoolList.clear();
- (*i)->getEmptyFilePools(filePoolList);
- std::stringstream oss;
- oss << " * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size()
- << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'";
- journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
- for (std::vector<qpid::linearstore::journal::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
- std::ostringstream oss;
- oss << " - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
- " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
- journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
- }
+ journalLogRef_.log(JournalLog::LOG_INFO, (*i)->toString(5U));
}
}
}
@@ -210,4 +188,22 @@ uint16_t EmptyFilePoolManager::getNumEfpPartitions() const {
return partitionMap_.size();
}
+EmptyFilePoolPartition* EmptyFilePoolManager::insertPartition(const efpPartitionNumber_t pn, const std::string& fullPartitionPath) {
+ EmptyFilePoolPartition* efppp = 0;
+ try {
+ efppp = new EmptyFilePoolPartition(pn, fullPartitionPath, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+ {
+ slock l(partitionMapMutex_);
+ partitionMap_[pn] = efppp;
+ }
+ } catch (const std::exception& e) {
+ if (efppp != 0) {
+ delete efppp;
+ efppp = 0;
+ }
+//std::cerr << "*** Unable to initialize partition " << pn << " (\'" << fullPartitionPath << "\'): " << e.what() << std::endl; // DEBUG
+ }
+ return efppp;
+}
+
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
index d7d08dcd25..d0aa7fa7d6 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
@@ -46,6 +46,7 @@ protected:
const efpPartitionNumber_t defaultPartitionNumber_;
const efpDataSize_kib_t defaultEfpDataSize_kib_;
const bool overwriteBeforeReturnFlag_;
+ const bool truncateFlag_;
JournalLog& journalLogRef_;
partitionMap_t partitionMap_;
smutex partitionMapMutex_;
@@ -55,6 +56,7 @@ public:
const efpPartitionNumber_t defaultPartitionNumber,
const efpDataSize_kib_t defaultEfpDataSize_kib,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef_);
virtual ~EmptyFilePoolManager();
@@ -72,6 +74,8 @@ public:
void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
const efpPartitionNumber_t efpPartitionNumber = 0);
uint16_t getNumEfpPartitions() const;
+protected:
+ EmptyFilePoolPartition* insertPartition(const efpPartitionNumber_t pn, const std::string& fullPartitionPath);
};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
index f80268b0e6..c10caebc6f 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
@@ -26,22 +26,19 @@
#include "qpid/linearstore/journal/jdir.h"
#include "qpid/linearstore/journal/slock.h"
-//#include <iostream> // DEBUG
-
namespace qpid {
namespace linearstore {
namespace journal {
-// static
-const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
-
EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
const std::string& partitionDir,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef) :
partitionNum_(partitionNum),
partitionDir_(partitionDir),
overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+ truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{
validatePartitionDir();
@@ -57,25 +54,13 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() {
void
EmptyFilePoolPartition::findEmptyFilePools() {
-//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+//std::cout << "*** Reading " << partitionDir_ << std::endl; // DEBUG
std::vector<std::string> dirList;
- jdir::read_dir(partitionDir_, dirList, true, false, false, false);
- bool foundEfpDir = false;
- for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- if (i->compare(s_efpTopLevelDir_) == 0) {
- foundEfpDir = true;
- break;
- }
- }
- if (foundEfpDir) {
- std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
-//std::cout << "Reading " << efpDir << std::endl; // DEBUG
- dirList.clear();
- jdir::read_dir(efpDir, dirList, true, false, false, true);
+ jdir::read_dir(partitionDir_, dirList, true, false, false, true);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
EmptyFilePool* efpp = 0;
try {
- efpp = new EmptyFilePool(*i, this, journalLogRef_);
+ efpp = new EmptyFilePool(*i, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
{
slock l(efpMapMutex_);
efpMap_[efpp->dataSize_kib()] = efpp;
@@ -88,13 +73,14 @@ EmptyFilePoolPartition::findEmptyFilePools() {
}
//std::cerr << "WARNING: " << e.what() << std::endl;
}
- if (efpp != 0)
+ if (efpp != 0) {
efpp->initialize();
+ }
}
- }
}
EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+ slock l(efpMapMutex_);
efpMapItr_t i = efpMap_.find(efpDataSize_kib);
if (i == efpMap_.end())
return 0;
@@ -102,21 +88,19 @@ EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t
}
void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
+ slock l(efpMapMutex_);
for (efpMapItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
efpList.push_back(i->second);
}
}
void EmptyFilePoolPartition::getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList_kib) const {
+ slock l(efpMapMutex_);
for (efpMapConstItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
efpDataSizesList_kib.push_back(i->first);
}
}
-bool EmptyFilePoolPartition::getOverwriteBeforeReturnFlag() const {
- return overwriteBeforeReturnFlag_;
-}
-
std::string EmptyFilePoolPartition::getPartitionDirectory() const {
return partitionDir_;
}
@@ -125,6 +109,32 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber() const {
return partitionNum_;
}
+std::string EmptyFilePoolPartition::toString(const uint16_t indent) const {
+ std::string indentStr(indent, ' ');
+ std::stringstream oss;
+ oss << "EFP Partition " << partitionNum_ << ":" << std::endl;
+ oss << indentStr << "EFP Partition Analysis (partition " << partitionNum_ << " at \"" << partitionDir_ << "\"):" << std::endl;
+ if (efpMap_.empty()) {
+ oss << indentStr << "<Partition empty, no EFPs found>" << std::endl;
+ } else {
+ oss << indentStr << std::setw(12) << "efp_size_kib"
+ << std::setw(12) << "num_files"
+ << std::setw(18) << "tot_capacity_kib" << std::endl;
+ oss << indentStr << std::setw(12) << "------------"
+ << std::setw(12) << "----------"
+ << std::setw(18) << "----------------" << std::endl;
+ {
+ slock l(efpMapMutex_);
+ for (efpMapConstItr_t i=efpMap_.begin(); i!= efpMap_.end(); ++i) {
+ oss << indentStr << std::setw(12) << i->first
+ << std::setw(12) << i->second->numEmptyFiles()
+ << std::setw(18) << i->second->cumFileSize_kib() << std::endl;
+ }
+ }
+ }
+ return oss.str();
+}
+
// static
std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNumber_t partitionNumber) {
std::ostringstream oss;
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
index 7eb9f70d52..c653c6be6a 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
@@ -37,8 +37,6 @@ class JournalLog;
class EmptyFilePoolPartition
{
-public:
- static const std::string s_efpTopLevelDir_;
protected:
typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
typedef efpMap_t::iterator efpMapItr_t;
@@ -47,6 +45,7 @@ protected:
const efpPartitionNumber_t partitionNum_;
const std::string partitionDir_;
const bool overwriteBeforeReturnFlag_;
+ const bool truncateFlag_;
JournalLog& journalLogRef_;
efpMap_t efpMap_;
smutex efpMapMutex_;
@@ -55,6 +54,7 @@ public:
EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
const std::string& partitionDir,
const bool overwriteBeforeReturnFlag,
+ const bool truncateFlag,
JournalLog& journalLogRef);
virtual ~EmptyFilePoolPartition();
@@ -62,9 +62,9 @@ public:
EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
- bool getOverwriteBeforeReturnFlag() const;
std::string getPartitionDirectory() const;
efpPartitionNumber_t getPartitionNumber() const;
+ std::string toString(const uint16_t indent) const;
static std::string getPartionDirectoryName(const efpPartitionNumber_t partitionNumber);
static efpPartitionNumber_t getPartitionNumber(const std::string& name);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
index 14213f7955..4cae4e6538 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
@@ -23,6 +23,7 @@
#define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_
#include <iostream>
+#include <sstream>
#include <stdint.h>
namespace qpid {
@@ -42,7 +43,13 @@ typedef struct efpIdentity_t {
efpIdentity_t() : pn_(0), ds_(0) {}
efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {}
efpIdentity_t(const efpIdentity_t& ei) : pn_(ei.pn_), ds_(ei.ds_) {}
- friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
+ friend std::ostream& operator<<(std::ostream& os, const efpIdentity_t& id) {
+ // This two-stage write allows this << operator to be used with std::setw() for formatted writes
+ std::ostringstream oss;
+ oss << id.pn_ << "," << id.ds_;
+ os << oss.str();
+ return os;
+ }
} efpIdentity_t;
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index 8d43aff8c5..e6e07dc8e2 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -96,7 +96,7 @@ uint64_t LinearFileController::getNextRecordId() {
void LinearFileController::removeFileToEfp(const std::string& fileName) {
if (emptyFilePoolPtr_) {
- emptyFilePoolPtr_->returnEmptyFile(fileName);
+ emptyFilePoolPtr_->returnEmptyFileSymlink(fileName);
}
}
@@ -108,7 +108,7 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) {
void LinearFileController::purgeEmptyFilesToEfp() {
slock l(journalFileListMutex_);
while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+ emptyFilePoolPtr_->returnEmptyFileSymlink(journalFileList_.front()->getFqFileName());
delete journalFileList_.front();
journalFileList_.pop_front();
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 47aa2f634e..3f39913422 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -322,36 +322,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
}
}
-std::string RecoveryManager::toString(const std::string& jid) {
- std::ostringstream oss;
- oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
- oss << " Number of journal files = " << fileNumberMap_.size() << std::endl;
- oss << " Journal File List:" << std::endl;
- for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
- std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
- oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
- }
- oss << " Enqueue Counts: [ ";
- for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
- if (l != fileNumberMap_.begin()) {
- oss << ", ";
- }
- oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
- }
- oss << " ]" << std::endl;
- oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
- std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " End offset = 0x" << std::hex << endOffset_ << std::dec << " (" <<
- (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
- oss << " Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
- oss << " Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
- oss << " Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- oss << " Enqueued records (txn & non-txn):" << std::endl;
- return oss.str();
-}
-
-std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+std::string RecoveryManager::toString(const std::string& jid, const uint16_t indent) const {
std::string indentStr(indent, ' ');
std::ostringstream oss;
oss << std::endl << indentStr << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
@@ -360,18 +331,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
} else {
oss << indentStr << std::setw(7) << "file_id"
<< std::setw(43) << "file_name"
- << std::setw(16) << "fro"
<< std::setw(12) << "record_cnt"
- << std::setw(5) << "ptn"
- << std::setw(10) << "efp"
+ << std::setw(16) << "fro"
+ << std::setw(12) << "efp_id"
<< std::endl;
oss << indentStr << std::setw(7) << "-------"
<< std::setw(43) << "-----------------------------------------"
+ << std::setw(12) << "----------"
<< std::setw(16) << "--------------"
<< std::setw(12) << "----------"
- << std::setw(5) << "---"
- << std::setw(10) << "--------"
<< std::endl;
+ uint32_t totalRecordCount(0UL);
for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
std::ostringstream fid;
@@ -380,19 +350,20 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
oss << indentStr << std::setw(7) << fid.str()
<< std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
- << std::setw(16) << fro.str()
<< std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
- << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
- << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
+ << std::setw(16) << fro.str()
+ << std::setw(12) << k->second->journalFilePtr_->getEfpIdentity()
<< std::endl;
+ totalRecordCount += k->second->journalFilePtr_->getEnqueuedRecordCount();
}
+ oss << indentStr << std::setw(62) << "----------" << std::endl;
+ oss << indentStr << std::setw(62) << totalRecordCount << std::endl;
oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " (" <<
(endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
- //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
}
return oss.str();
}
@@ -942,7 +913,7 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
- emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+ emptyFilePoolPtr->returnEmptyFileSymlink(rfdp->journalFilePtr_->getFqFileName());
delete rfdp->journalFilePtr_;
delete rfdp;
fileNumberMap_.erase(fileNumberMap_.begin()->first);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
index eeda4630f3..55cc6f8329 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
@@ -125,8 +125,7 @@ public:
void recoveryComplete();
void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr);
- std::string toString(const std::string& jid);
- std::string toLog(const std::string& jid, const int indent);
+ std::string toString(const std::string& jid, const uint16_t indent) const;
protected:
void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
void checkFileStreamOk(bool checkEof);
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
index 902b22dc71..cc31f2e1df 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
@@ -119,7 +119,7 @@ jcntl::recover(EmptyFilePoolManager* efpmp,
assert(_emptyFilePoolPtr != 0);
highest_rid = _recoveryManager.getHighestRecordId();
- _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
+ _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, 5U));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
_recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
if (_recoveryManager.isLastFileFull()) {
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 81f96677f4..e1602cf961 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -112,6 +112,7 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02;
const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03;
const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04;
const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05;
+const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06;
// Negative returns for some functions
const int32_t jerrno::AIO_TIMEOUT = -1;
@@ -206,6 +207,7 @@ jerrno::__init()
_err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
_err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
_err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
+ _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed";
//_err_map[] = "";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
index e5cd7e8227..36410f9844 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
@@ -130,6 +130,7 @@ namespace journal {
static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
+ static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed
// Negative returns for some functions
static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return