diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b05172f984..c530e9cd51 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -146,6 +146,10 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; + // Same thing but for the new cluster interface. + if (broker && !broker->getCluster().enqueue(*this, msg)) + return; + if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -165,7 +169,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ }else { push(msg); } - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -199,7 +202,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -642,6 +644,7 @@ void Queue::popMsg(QueuedMessage& qmsg) void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); + if (!isRecovery) mgntEnqStats(msg); QueuedMessage qm; QueueListeners::NotificationSet copy; { @@ -687,7 +690,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); - if (broker) broker->getCluster().enqueue(qm); } QueuedMessage Queue::getFront() @@ -868,10 +870,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { - dequeued(msg); - } + if (!ctxt) dequeued(msg); } + if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -888,6 +889,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { + if (broker) broker->getCluster().drop(msg); // Outside lock Mutex::ScopedLock locker(messageLock); dequeued(msg); if (mgmtObject != 0) { @@ -913,9 +915,8 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { - // Note: Cluster::dequeued does only local book-keeping, no multicast + // Note: Cluster::drop does only local book-keeping, no multicast // So OK to call here with lock held. - if (broker) broker->getCluster().dequeue(msg); if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { |
