diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-05-01 04:06:00 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-05-01 04:06:00 +0000 |
| commit | df37800ef7d66d4f02eb99a36e272cd967f7bf07 (patch) | |
| tree | ce8a370b85a9e918e44ad6090c562dddfb8570ef /qpid/cpp/src | |
| parent | aa428e6d93b829562024ae52feb96ae8c925f4bf (diff) | |
| download | qpid-python-df37800ef7d66d4f02eb99a36e272cd967f7bf07.tar.gz | |
QPID-4767 [legacystore] QMF commands to create a persistent queue with an illegal number of journal files or journal file size should be rejected, QPID-4794 Resizing qpid legacystore journal does not update queue arguments provided by QMF
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1477907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoverableQueue.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp | 50 |
3 files changed, 46 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/RecoverableQueue.h b/qpid/cpp/src/qpid/broker/RecoverableQueue.h index 49f05f97a1..103711d7d5 100644 --- a/qpid/cpp/src/qpid/broker/RecoverableQueue.h +++ b/qpid/cpp/src/qpid/broker/RecoverableQueue.h @@ -29,6 +29,7 @@ namespace qpid { namespace broker { class ExternalQueueStore; +class QueueSettings; /** * The interface through which messages are added back to queues on @@ -49,7 +50,9 @@ public: virtual const std::string& getName() const = 0; virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0; - virtual ExternalQueueStore* getExternalQueueStore() const = 0; + virtual ExternalQueueStore* getExternalQueueStore() const = 0; + virtual const QueueSettings& getSettings() const = 0; + virtual void addArgument(const std::string& key, const types::Variant& value) = 0; }; diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index df1c88d183..bfac48a5d3 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -56,9 +56,12 @@ public: const std::string& getName() const; void setExternalQueueStore(ExternalQueueStore* inst); ExternalQueueStore* getExternalQueueStore() const; + const QueueSettings& getSettings() const; + void addArgument(const std::string& key, const types::Variant& value); void recover(RecoverableMessage::shared_ptr msg); void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); + }; class RecoverableExchangeImpl : public RecoverableExchange @@ -219,6 +222,16 @@ ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const return queue->getExternalQueueStore(); } +const QueueSettings& RecoverableQueueImpl::getSettings() const +{ + return queue->getSettings(); +} + +void RecoverableQueueImpl::addArgument(const std::string& key, const types::Variant& value) +{ + queue->addArgument(key, value); +} + void RecoverableExchangeImpl::setPersistenceId(uint64_t id) { exchange->setPersistenceId(id); diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp index 8c1baa5052..c92c9828f4 100644 --- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp @@ -21,6 +21,7 @@ #include "qpid/legacystore/MessageStoreImpl.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/legacystore/BindingDbt.h" #include "qpid/legacystore/BufferValue.h" #include "qpid/legacystore/IdDbt.h" @@ -79,35 +80,27 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName) { - u_int16_t p = param; - if (p < JRNL_MIN_NUM_FILES) { - p = JRNL_MIN_NUM_FILES; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value."); - } else if (p > JRNL_MAX_NUM_FILES) { - p = JRNL_MAX_NUM_FILES; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value."); + if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) { + std::ostringstream oss; + oss << "Parameter " << paramName << ": Illegal number of store journal files (" << param << "), must be " << JRNL_MIN_NUM_FILES << " to " << JRNL_MAX_NUM_FILES << " inclusive."; + THROW_STORE_EXCEPTION(oss.str()); } - return p; + return param; } u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks) { - u_int32_t p = param; - u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE; - u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE; - if (p < min) { - p = min; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value."); - } else if (p > max) { - p = max; - QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value."); - } - if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) { + if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) { std::ostringstream oss; - oss << "Cannot create store with file size less than write page cache size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]"; + oss << "Parameter " << paramName << ": Illegal store journal file size (" << param << "), must be " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " to " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " inclusive."; THROW_STORE_EXCEPTION(oss.str()); } - return p; + if (wCachePgSizeSblks > param * JRNL_RMGR_PAGE_SIZE) { + std::ostringstream oss; + oss << "Cannot create store with file size less than write page cache size. [file size = " << param << " (" << (param * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]"; + THROW_STORE_EXCEPTION(oss.str()); + } + return param; } u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs) @@ -804,6 +797,21 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, long idcnt = 0L; // in-doubt msg count u_int64_t thisHighestRid = 0ULL; jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery + + // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting + // from recovery of a store that has had its size changed externally by the resize utility. + // If so, update the queue store settings so that QMF queries will reflect the new values. + const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings; + qpid::framing::FieldTable::ValuePtr value; + value = storeargs.get("qpid.file_count"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int16_t)value->get<int>() != jQueue->num_jfiles()) { + queue->addArgument("qpid.file_count", jQueue->num_jfiles()); + } + value = storeargs.get("qpid.file_size"); + if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) { + queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE); + } + if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit |
