diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 1cc48a949e..80794f791f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -569,9 +569,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (policy.get()) { - policy->enqueued(qm); - } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -605,6 +602,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (eventMgr) eventMgr->enqueued(qm); else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } + if (policy.get()) { + Mutex::ScopedUnlock locker(messageLock); + policy->enqueued(qm); + } } copy.notify(); } @@ -792,9 +793,16 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings, bool recovering) { + + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store))) { - QPID_LOG(warning, "Flow to disk not valid for non-persisted queue"); + (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + if ( NullMessageStore::isNullStore(store)) { + QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); + } else if (eventMgr && !eventMgr->isSync() ) { + QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); + } FieldTable copy(_settings); copy.erase(QueuePolicy::typeKey); setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); @@ -803,19 +811,19 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); + QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); lastValueQueue = lastValueQueueNoBrowse; } persistLastNode= _settings.get(qpidPersistLastNode); - if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); traceId = _settings.getAsString(qpidTraceIdentity); std::string excludeList = _settings.getAsString(qpidTraceExclude); @@ -825,8 +833,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); - eventMode = _settings.getAsInt(qpidQueueEventGeneration); - FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); |
