summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp17
1 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b05172f984..c530e9cd51 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -146,6 +146,10 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
+ // Same thing but for the new cluster interface.
+ if (broker && !broker->getCluster().enqueue(*this, msg))
+ return;
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -165,7 +169,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
}else {
push(msg);
}
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -199,7 +202,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -642,6 +644,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ if (!isRecovery) mgntEnqStats(msg);
QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
@@ -687,7 +690,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
- if (broker) broker->getCluster().enqueue(qm);
}
QueuedMessage Queue::getFront()
@@ -868,10 +870,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
- dequeued(msg);
- }
+ if (!ctxt) dequeued(msg);
}
+ if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -888,6 +889,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
+ if (broker) broker->getCluster().drop(msg); // Outside lock
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
if (mgmtObject != 0) {
@@ -913,9 +915,8 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
- // Note: Cluster::dequeued does only local book-keeping, no multicast
+ // Note: Cluster::drop does only local book-keeping, no multicast
// So OK to call here with lock held.
- if (broker) broker->getCluster().dequeue(msg);
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {