diff options
author | Andrew Stitcher <astitcher@apache.org> | 2015-09-09 22:47:53 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2015-09-09 22:47:53 +0000 |
commit | 664b5f13a6f9e88e66a0564d15ce7a006777b52c (patch) | |
tree | 36b73a6c7b2069db52b78cba36f1c767a372fc2a | |
parent | d7ac11084dcc43b47b8f514151235d6fa9efe186 (diff) | |
download | qpid-python-664b5f13a6f9e88e66a0564d15ce7a006777b52c.tar.gz |
QPID-6730: Make the linearstore journal timeout configurable
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1702130 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h | 10 |
2 files changed, 17 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 70eac27f48..68c49e4d90 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -46,9 +46,6 @@ namespace linearstore{ const std::string MessageStoreImpl::storeTopLevelDir("qls"); // Sets the top-level store dir name -// FIXME aconway 2010-03-09: was 10 -qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms -qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath_) : @@ -60,6 +57,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en tplWCachePgSizeSblks(0), tplWCacheNumPages(0), highestRid(0), + journalFlushTimeout(defJournalFlushTimeoutNs), isInit(false), envPath(envpath_), broker(broker_), @@ -172,6 +170,7 @@ bool MessageStoreImpl::init(const qpid::Options* options_) qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); + journalFlushTimeout = opts->journalFlushTimeout; // Pass option values to init() return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib, @@ -213,6 +212,7 @@ bool MessageStoreImpl::init(const std::string& storeDir_, QLS_LOG(info, "> EFP partition: " << defaultEfpPartitionNumber); QLS_LOG(info, "> EFP file size pool: " << defaultEfpFileSize_kib << " (KiB)"); QLS_LOG(info, "> Overwrite before return to EFP: " << (overwriteBeforeReturnFlag?"True":"False")); + QLS_LOG(info, "> Maximum journal flush time: " << journalFlushTimeout); return isInit; } @@ -267,7 +267,7 @@ void MessageStoreImpl::init(const bool truncateFlag) // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure. // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026 - tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeout, defJournalFlushTimeout, 0)); + tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), jrnlLog, defJournalGetEventsTimeoutNs, journalFlushTimeout, 0)); isInit = true; } catch (const DbException& e) { if (e.get_errno() == DB_VERSION_MISMATCH) @@ -411,7 +411,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, } jQueue = new JournalImpl(broker->getTimer(), queue_.getName(), getJrnlDir(queue_.getName()), jrnlLog, - defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + defJournalGetEventsTimeoutNs, journalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); @@ -727,7 +727,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, break; } jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlDir(queueName),jrnlLog, - defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + defJournalGetEventsTimeoutNs, journalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); @@ -1527,7 +1527,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : tplWCachePageSizeKib(defTplWCachePageSizeKib), efpPartition(defEfpPartition), efpFileSizeKib(defEfpFileSizeKib), - overwriteBeforeReturnFlag(defOverwriteBeforeReturnFlag) + overwriteBeforeReturnFlag(defOverwriteBeforeReturnFlag), + journalFlushTimeout(defJournalFlushTimeoutNs) { addOptions() ("store-dir", qpid::optValue(storeDir, "DIR"), @@ -1553,6 +1554,8 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : "it to the Empty File Pool. When not in use (the default), then old message data remains " "in the file, but is overwritten on next use. This option should only be used where security " "considerations justify it as it makes the store somewhat slower.") + ("journal-flush-timeout", qpid::optValue(journalFlushTimeout, "SECONDS"), + "Maximum time to wait to flush journal") ; } diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h index 236fcf2cf8..8b11e115b6 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -30,6 +30,7 @@ #include "qpid/linearstore/journal/jcfg.h" #include "qpid/linearstore/journal/EmptyFilePoolTypes.h" #include "qpid/linearstore/PreparedTransaction.h" +#include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/linearstore/Store.h" @@ -83,9 +84,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem uint16_t efpPartition; uint64_t efpFileSizeKib; bool overwriteBeforeReturnFlag; + qpid::sys::Duration journalFlushTimeout; }; - protected: + private: typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index; typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index; typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index; @@ -105,8 +107,9 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem static const bool defOverwriteBeforeReturnFlag = false; static const std::string storeTopLevelDir; - static qpid::sys::Duration defJournalGetEventsTimeout; - static qpid::sys::Duration defJournalFlushTimeout; + // FIXME aconway 2010-03-09: was 10ms + static const uint64_t defJournalGetEventsTimeoutNs = 1 * 1000000; // 1ms + static const uint64_t defJournalFlushTimeoutNs = 500 * 1000000; // 500ms std::list<db_ptr> dbs; dbEnv_ptr dbenv; @@ -137,6 +140,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem uint32_t tplWCachePgSizeSblks; uint16_t tplWCacheNumPages; uint64_t highestRid; + qpid::sys::Duration journalFlushTimeout; bool isInit; const char* envPath; qpid::broker::Broker* broker; |