summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp28
1 files changed, 17 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 1cc48a949e..80794f791f 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -569,9 +569,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (policy.get()) {
- policy->enqueued(qm);
- }
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -605,6 +602,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (eventMgr) eventMgr->enqueued(qm);
else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
}
+ if (policy.get()) {
+ Mutex::ScopedUnlock locker(messageLock);
+ policy->enqueued(qm);
+ }
}
copy.notify();
}
@@ -792,9 +793,16 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings, bool recovering)
{
+
+ eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store))) {
- QPID_LOG(warning, "Flow to disk not valid for non-persisted queue");
+ (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ if ( NullMessageStore::isNullStore(store)) {
+ QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
+ } else if (eventMgr && !eventMgr->isSync() ) {
+ QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
+ }
FieldTable copy(_settings);
copy.erase(QueuePolicy::typeKey);
setPolicy(QueuePolicy::createQueuePolicy(getName(), copy));
@@ -803,19 +811,19 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
}
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
lastValueQueue = lastValueQueueNoBrowse;
}
persistLastNode= _settings.get(qpidPersistLastNode);
- if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
traceId = _settings.getAsString(qpidTraceIdentity);
std::string excludeList = _settings.getAsString(qpidTraceExclude);
@@ -825,8 +833,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
- eventMode = _settings.getAsInt(qpidQueueEventGeneration);
-
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());