summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp17
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h10
2 files changed, 17 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 70eac27f48..68c49e4d90 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -46,9 +46,6 @@ namespace linearstore{
const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name
-// FIXME aconway 2010-03-09: was 10
-qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms
-qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) :
@@ -60,6 +57,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
tplWCachePgSizeSblks(0),
tplWCacheNumPages(0),
highestRid(0),
+ journalFlushTimeout(defJournalFlushTimeoutNs),
isInit(false),
envPath(envpath_),
broker(broker_),
@@ -172,6 +170,7 @@ bool MessageStoreImpl::init(const qpid::Options* options_)
qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
+ journalFlushTimeout = opts->journalFlushTimeout;
// Pass option values to init()
return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib,
@@ -213,6 +212,7 @@ bool MessageStoreImpl::init(const std::string& storeDir_,
QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber);
QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSize_kib << " (KiB)");
QLS_LOG(info, "> Overwrite before return to EFP: " << (overwriteBeforeReturnFlag?"True":"False"));
+ QLS_LOG(info, "> Maximum journal flush time: " << journalFlushTimeout);
return isInit;
}
@@ -267,7 +267,7 @@ void MessageStoreImpl::init(const bool truncateFlag)
// NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
- tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+ tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeoutNs, journalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -411,7 +411,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
}
jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), jrnlLog,
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ defJournalGetEventsTimeoutNs, journalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
@@ -727,7 +727,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
break;
}
jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName),jrnlLog,
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ defJournalGetEventsTimeoutNs, journalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
@@ -1527,7 +1527,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
tplWCachePageSizeKib(defTplWCachePageSizeKib),
efpPartition(defEfpPartition),
efpFileSizeKib(defEfpFileSizeKib),
- overwriteBeforeReturnFlag(defOverwriteBeforeReturnFlag)
+ overwriteBeforeReturnFlag(defOverwriteBeforeReturnFlag),
+ journalFlushTimeout(defJournalFlushTimeoutNs)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
@@ -1553,6 +1554,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
"it to the Empty File Pool. When not in use (the default), then old message data remains "
"in the file, but is overwritten on next use. This option should only be used where security "
"considerations justify it as it makes the store somewhat slower.")
+ ("journal-flush-timeout", qpid::optValue(journalFlushTimeout, "SECONDS"),
+ "Maximum time to wait to flush journal")
;
}
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
index 236fcf2cf8..8b11e115b6 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
@@ -30,6 +30,7 @@
#include "qpid/linearstore/journal/jcfg.h"
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/PreparedTransaction.h"
+#include "qpid/sys/Time.h"
#include "qmf/org/apache/qpid/linearstore/Store.h"
@@ -83,9 +84,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
uint16_t efpPartition;
uint64_t efpFileSizeKib;
bool overwriteBeforeReturnFlag;
+ qpid::sys::Duration journalFlushTimeout;
};
- protected:
+ private:
typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
@@ -105,8 +107,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
static const bool defOverwriteBeforeReturnFlag = false;
static const std::string storeTopLevelDir;
- static qpid::sys::Duration defJournalGetEventsTimeout;
- static qpid::sys::Duration defJournalFlushTimeout;
+ // FIXME aconway 2010-03-09: was 10ms
+ static const uint64_t defJournalGetEventsTimeoutNs = 1 * 1000000; // 1ms
+ static const uint64_t defJournalFlushTimeoutNs = 500 * 1000000; // 500ms
std::list<db_ptr> dbs;
dbEnv_ptr dbenv;
@@ -137,6 +140,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
uint32_t tplWCachePgSizeSblks;
uint16_t tplWCacheNumPages;
uint64_t highestRid;
+ qpid::sys::Duration journalFlushTimeout;
bool isInit;
const char* envPath;
qpid::broker::Broker* broker;