diff options
| author | Gordon Sim <gsim@apache.org> | 2009-02-12 11:43:51 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-02-12 11:43:51 +0000 |
| commit | 3f7745fb58b028058a860d7e95f308404728f09f (patch) | |
| tree | dd28c178bed1a4f38dbb05e755cee7448e87600c /cpp/src/qpid/broker | |
| parent | 30072b3b418ab9b0af293482878591b7beb6e9bf (diff) | |
| download | qpid-python-3f7745fb58b028058a860d7e95f308404728f09f.tar.gz | |
QPID-1660: If selected consumer can't take a message, ensure others are notified of message availability.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@743694 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 1 |
4 files changed, 42 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index f3cdc03f7d..c9ee7f394f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -256,10 +256,30 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } +void Queue::notifyListener() +{ + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + if (messages.size()) { + listeners.populate(set); + } + } + set.notify(); +} + bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { if (c->preAcquires()) { - return consumeNextMessage(m, c); + switch (consumeNextMessage(m, c)) { + case CONSUMED: + return true; + case CANT_CONSUME: + notifyListener();//let someone else try + case NO_MESSAGES: + default: + return false; + } } else { return browseNextMessage(m, c); } @@ -291,14 +311,14 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) } } -bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); - return false; + return NO_MESSAGES; } else { QueuedMessage msg = getFront(); if (msg.payload->hasExpired()) { @@ -311,16 +331,16 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) if (c->accept(msg.payload)) { m = msg; popMsg(msg); - return true; + return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return false; + return CANT_CONSUME; } } else { //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - return false; + return CANT_CONSUME; } } } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index e4bcded8bd..61fbd45de8 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -68,6 +68,7 @@ namespace qpid { typedef std::deque<QueuedMessage> Messages; typedef std::map<string,boost::intrusive_ptr<Message> > LVQ; + enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; const string name; const bool autodelete; @@ -104,8 +105,9 @@ namespace qpid { void setPolicy(std::auto_ptr<QueuePolicy> policy); bool seek(QueuedMessage& msg, Consumer::shared_ptr position); bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + void notifyListener(); void removeListener(Consumer::shared_ptr); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 13a8c649d2..4f751e43b7 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -527,9 +527,19 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) } } +bool SemanticState::ConsumerImpl::haveCredit() +{ + if (msgCredit) { + return true; + } else { + blocked = true; + return false; + } +} + void SemanticState::ConsumerImpl::flush() { - while(queue->dispatch(shared_from_this())) + while(haveCredit() && queue->dispatch(shared_from_this())) ; stop(); } @@ -587,7 +597,7 @@ bool SemanticState::ConsumerImpl::hasOutput() { bool SemanticState::ConsumerImpl::doOutput() { - return queue->dispatch(shared_from_this()); + return haveCredit() && queue->dispatch(shared_from_this()); } void SemanticState::ConsumerImpl::enableNotify() diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index a1bee23fd2..c31a6978c9 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -82,6 +82,7 @@ class SemanticState : public sys::OutputTask, bool checkCredit(boost::intrusive_ptr<Message>& msg); void allocateCredit(boost::intrusive_ptr<Message>& msg); + bool haveCredit(); public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; |
