summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-05-01 04:06:00 +0000
committerKim van der Riet <kpvdr@apache.org>2013-05-01 04:06:00 +0000
commitdf37800ef7d66d4f02eb99a36e272cd967f7bf07 (patch)
treece8a370b85a9e918e44ad6090c562dddfb8570ef /qpid/cpp/src
parentaa428e6d93b829562024ae52feb96ae8c925f4bf (diff)
downloadqpid-python-df37800ef7d66d4f02eb99a36e272cd967f7bf07.tar.gz
QPID-4767 [legacystore] QMF commands to create a persistent queue with an illegal number of journal files or journal file size should be rejected, QPID-4794 Resizing qpid legacystore journal does not update queue arguments provided by QMF
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1477907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableQueue.h5
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp13
-rw-r--r--qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp50
3 files changed, 46 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/RecoverableQueue.h b/qpid/cpp/src/qpid/broker/RecoverableQueue.h
index 49f05f97a1..103711d7d5 100644
--- a/qpid/cpp/src/qpid/broker/RecoverableQueue.h
+++ b/qpid/cpp/src/qpid/broker/RecoverableQueue.h
@@ -29,6 +29,7 @@ namespace qpid {
namespace broker {
class ExternalQueueStore;
+class QueueSettings;
/**
* The interface through which messages are added back to queues on
@@ -49,7 +50,9 @@ public:
virtual const std::string& getName() const = 0;
virtual void setExternalQueueStore(ExternalQueueStore* inst) = 0;
- virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+ virtual ExternalQueueStore* getExternalQueueStore() const = 0;
+ virtual const QueueSettings& getSettings() const = 0;
+ virtual void addArgument(const std::string& key, const types::Variant& value) = 0;
};
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index df1c88d183..bfac48a5d3 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -56,9 +56,12 @@ public:
const std::string& getName() const;
void setExternalQueueStore(ExternalQueueStore* inst);
ExternalQueueStore* getExternalQueueStore() const;
+ const QueueSettings& getSettings() const;
+ void addArgument(const std::string& key, const types::Variant& value);
void recover(RecoverableMessage::shared_ptr msg);
void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
+
};
class RecoverableExchangeImpl : public RecoverableExchange
@@ -219,6 +222,16 @@ ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const
return queue->getExternalQueueStore();
}
+const QueueSettings& RecoverableQueueImpl::getSettings() const
+{
+ return queue->getSettings();
+}
+
+void RecoverableQueueImpl::addArgument(const std::string& key, const types::Variant& value)
+{
+ queue->addArgument(key, value);
+}
+
void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
{
exchange->setPersistenceId(id);
diff --git a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
index 8c1baa5052..c92c9828f4 100644
--- a/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
@@ -21,6 +21,7 @@
#include "qpid/legacystore/MessageStoreImpl.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/legacystore/BindingDbt.h"
#include "qpid/legacystore/BufferValue.h"
#include "qpid/legacystore/IdDbt.h"
@@ -79,35 +80,27 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
{
- u_int16_t p = param;
- if (p < JRNL_MIN_NUM_FILES) {
- p = JRNL_MIN_NUM_FILES;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value.");
- } else if (p > JRNL_MAX_NUM_FILES) {
- p = JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
+ if (param < JRNL_MIN_NUM_FILES || param > JRNL_MAX_NUM_FILES) {
+ std::ostringstream oss;
+ oss << "Parameter " << paramName << ": Illegal number of store journal files (" << param << "), must be " << JRNL_MIN_NUM_FILES << " to " << JRNL_MAX_NUM_FILES << " inclusive.";
+ THROW_STORE_EXCEPTION(oss.str());
}
- return p;
+ return param;
}
u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks)
{
- u_int32_t p = param;
- u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (p < min) {
- p = min;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value.");
- } else if (p > max) {
- p = max;
- QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value.");
- }
- if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) {
+ if (param < (JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE) || (param > JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE)) {
std::ostringstream oss;
- oss << "Cannot create store with file size less than write page cache size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
+ oss << "Parameter " << paramName << ": Illegal store journal file size (" << param << "), must be " << JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " to " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " inclusive.";
THROW_STORE_EXCEPTION(oss.str());
}
- return p;
+ if (wCachePgSizeSblks > param * JRNL_RMGR_PAGE_SIZE) {
+ std::ostringstream oss;
+ oss << "Cannot create store with file size less than write page cache size. [file size = " << param << " (" << (param * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
+ THROW_STORE_EXCEPTION(oss.str());
+ }
+ return param;
}
u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs)
@@ -804,6 +797,21 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
long idcnt = 0L; // in-doubt msg count
u_int64_t thisHighestRid = 0ULL;
jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
+
+ // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
+ // from recovery of a store that has had its size changed externally by the resize utility.
+ // If so, update the queue store settings so that QMF queries will reflect the new values.
+ const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
+ qpid::framing::FieldTable::ValuePtr value;
+ value = storeargs.get("qpid.file_count");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int16_t)value->get<int>() != jQueue->num_jfiles()) {
+ queue->addArgument("qpid.file_count", jQueue->num_jfiles());
+ }
+ value = storeargs.get("qpid.file_size");
+ if (value.get() != 0 && !value->empty() && value->convertsTo<int>() && (u_int32_t)value->get<int>() != jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE) {
+ queue->addArgument("qpid.file_size", jQueue->jfsize_sblks()/JRNL_RMGR_PAGE_SIZE);
+ }
+
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit