diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 137 |
1 files changed, 32 insertions, 105 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c530e9cd51..e59857462c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -20,7 +20,6 @@ */ #include "qpid/broker/Broker.h" -#include "qpid/broker/Cluster.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" @@ -146,10 +145,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ // Check for deferred delivery in a cluster. if (broker && broker->deferDelivery(name, msg)) return; - // Same thing but for the new cluster interface. - if (broker && !broker->getCluster().enqueue(*this, msg)) - return; - if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -169,6 +164,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ }else { push(msg); } + mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -202,6 +198,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); + mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -227,7 +224,6 @@ void Queue::requeue(const QueuedMessage& msg){ } } } - if (broker) broker->getCluster().release(msg); copy.notify(); } @@ -240,22 +236,8 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } -// Inform the cluster of an acquired message on exit from a function -// that does the acquiring. The calling function should set qmsg -// to the acquired message. -struct ClusterAcquireOnExit { - Broker* broker; - QueuedMessage qmsg; - ClusterAcquireOnExit(Broker* b) : broker(b) {} - ~ClusterAcquireOnExit() { - if (broker && qmsg.queue) broker->getCluster().acquire(qmsg); - } -}; - bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { - ClusterAcquireOnExit willAcquire(broker); - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -266,18 +248,16 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess if (lastValueQueue) { clearLVQIndex(*i); } - QPID_LOG(debug, "Acquired message at " << i->position << " from " << name); - willAcquire.qmsg = *i; + QPID_LOG(debug, + "Acquired message at " << i->position << " from " << name); messages.erase(i); return true; - } + } QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); return false; } bool Queue::acquire(const QueuedMessage& msg) { - ClusterAcquireOnExit acquire(broker); - Mutex::ScopedLock locker(messageLock); assertClusterSafe(); @@ -285,17 +265,16 @@ bool Queue::acquire(const QueuedMessage& msg) { Messages::iterator i = findAt(msg.position); if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { + (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 + ) { clearLVQIndex(msg); QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); - acquire.qmsg = *i; messages.erase(i); return true; - } + } QPID_LOG(debug, "Acquire failed for " << msg.position); return false; @@ -335,8 +314,6 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { - ClusterAcquireOnExit willAcquire(broker); // Outside the lock - Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); @@ -353,7 +330,6 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - willAcquire.qmsg = msg; popMsg(msg); return CONSUMED; } else { @@ -475,51 +451,40 @@ QueuedMessage Queue::find(SequenceNumber pos) const { return QueuedMessage(); } -void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); - size_t consumers; - { - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); - } + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); } - consumers = ++consumerCount; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); } - if (broker) broker->getCluster().consume(*this, consumers); + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - size_t consumers; - { - Mutex::ScopedLock locker(consumerLock); - consumers = --consumerCount; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); - } - if (broker) broker->getCluster().cancel(*this, consumers); + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); } QueuedMessage Queue::get(){ - ClusterAcquireOnExit acquire(broker); // Outside lock - Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if(!messages.empty()){ msg = getFront(); - acquire.qmsg = msg; popMsg(msg); } return msg; @@ -644,12 +609,10 @@ void Queue::popMsg(QueuedMessage& qmsg) void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); - if (!isRecovery) mgntEnqStats(msg); - QueuedMessage qm; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); - qm = QueuedMessage(this, msg, ++sequence); + QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -666,14 +629,12 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; i->second->setReplacementMessage(msg,this); - // FIXME aconway 2010-10-15: it is incorrect to use qm.position below - // should be using the position of the message being replaced. if (isRecovery) { //can't issue new requests for the store until //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } } @@ -831,48 +792,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) if (policy.get()) policy->enqueueAborted(msg); } -void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) { - if (broker) broker->getCluster().accept(msg); - dequeue(ctxt, msg); -} - -struct ScopedClusterReject { - Broker* broker; - const QueuedMessage& qmsg; - ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) { - if (broker) broker->getCluster().reject(qmsg); - } - ~ScopedClusterReject() { - if (broker) broker->getCluster().rejected(qmsg); - } -}; - -void Queue::reject(const QueuedMessage &msg) { - ScopedClusterReject scr(broker, msg); - Exchange::shared_ptr alternate = getAlternateExchange(); - if (alternate) { - DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); - QPID_LOG(info, "Routed rejected message from " << getName() << " to " - << alternate->getName()); - } else { - //just drop it - QPID_LOG(info, "Dropping rejected message from " << getName()); - } - dequeue(0, msg); -} - // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); if (!u.acquired) return false; + { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) dequeued(msg); + if (!ctxt) { + dequeued(msg); + } } - if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock // This check prevents messages which have been forced persistent on one queue from dequeuing // from another on which no forcing has taken place and thus causing a store error. bool fp = msg.payload->isForcedPersistent(); @@ -889,7 +821,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { - if (broker) broker->getCluster().drop(msg); // Outside lock Mutex::ScopedLock locker(messageLock); dequeued(msg); if (mgmtObject != 0) { @@ -915,8 +846,6 @@ void Queue::popAndDequeue() */ void Queue::dequeued(const QueuedMessage& msg) { - // Note: Cluster::drop does only local book-keeping, no multicast - // So OK to call here with lock held. if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { @@ -932,7 +861,6 @@ void Queue::create(const FieldTable& _settings) store->create(*this, _settings); } configure(_settings); - if (broker) broker->getCluster().create(*this); } void Queue::configure(const FieldTable& _settings, bool recovering) @@ -1006,7 +934,6 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } - if (broker) broker->getCluster().destroy(*this); } void Queue::notifyDeleted() |
