diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index d664e6e141..b090ffef43 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -109,12 +109,12 @@ Queue::~Queue() void Queue::notifyDurableIOComplete() { - Listeners copy; + QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); - listeners.swap(copy); + listeners.populate(copy); } - for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); + copy.notify(); } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -187,14 +187,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ void Queue::requeue(const QueuedMessage& msg){ if (policy.get() && !policy->isEnqueued(msg)) return; - Listeners copy; + QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); - listeners.swap(copy); + listeners.populate(copy); } - for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); + copy.notify(); } void Queue::clearLVQIndex(const QueuedMessage& msg){ @@ -240,7 +240,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) if (messages.empty()) { //no message available, register consumer for notification //when this changes - addListener(c); + listeners.addListener(c); return false; } else { QueuedMessage msg = getFront(); @@ -248,7 +248,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) //though a message is on the queue, it has not yet been //enqueued and so is not available for consumption yet, //register consumer for notification when this changes - addListener(c); + listeners.addListener(c); return false; } else { //check that consumer has sufficient credit for the @@ -266,7 +266,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - addListener(c); + listeners.addListener(c); return false; } else { QueuedMessage msg = getFront(); @@ -323,15 +323,15 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) void Queue::removeListener(Consumer::shared_ptr c) { - Mutex::ScopedLock locker(messageLock); - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); - if (i != listeners.end()) listeners.erase(i); -} - -void Queue::addListener(Consumer::shared_ptr c) -{ - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); - if (i == listeners.end()) listeners.push_back(c); + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + listeners.removeListener(c); + if (messages.size()) { + listeners.populate(set); + } + } + set.notify(); } bool Queue::dispatch(Consumer::shared_ptr c) @@ -361,7 +361,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { return true; } } - addListener(c); + listeners.addListener(c); return false; } @@ -491,7 +491,7 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg){ - Listeners copy; + QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); @@ -505,17 +505,17 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ i = lvq.find(key); if (i == lvq.end()){ messages.push_back(qm); - listeners.swap(copy); + listeners.populate(copy); lvq[key] = msg; }else { i->second->setReplacementMessage(msg,this); } }else { messages.push_back(qm); - listeners.swap(copy); + listeners.populate(copy); } } - for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); + copy.notify(); } QueuedMessage Queue::getFront() |
