summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp46
1 files changed, 23 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index d664e6e141..b090ffef43 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -109,12 +109,12 @@ Queue::~Queue()
void Queue::notifyDurableIOComplete()
{
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- listeners.swap(copy);
+ listeners.populate(copy);
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -187,14 +187,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
void Queue::requeue(const QueuedMessage& msg){
if (policy.get() && !policy->isEnqueued(msg)) return;
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
- listeners.swap(copy);
+ listeners.populate(copy);
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
void Queue::clearLVQIndex(const QueuedMessage& msg){
@@ -240,7 +240,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
if (messages.empty()) {
//no message available, register consumer for notification
//when this changes
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
QueuedMessage msg = getFront();
@@ -248,7 +248,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
//though a message is on the queue, it has not yet been
//enqueued and so is not available for consumption yet,
//register consumer for notification when this changes
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
//check that consumer has sufficient credit for the
@@ -266,7 +266,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
QueuedMessage msg = getFront();
@@ -323,15 +323,15 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
void Queue::removeListener(Consumer::shared_ptr c)
{
- Mutex::ScopedLock locker(messageLock);
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
- if (i != listeners.end()) listeners.erase(i);
-}
-
-void Queue::addListener(Consumer::shared_ptr c)
-{
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
- if (i == listeners.end()) listeners.push_back(c);
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.removeListener(c);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
}
bool Queue::dispatch(Consumer::shared_ptr c)
@@ -361,7 +361,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
return true;
}
}
- addListener(c);
+ listeners.addListener(c);
return false;
}
@@ -491,7 +491,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg){
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
@@ -505,17 +505,17 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
i = lvq.find(key);
if (i == lvq.end()){
messages.push_back(qm);
- listeners.swap(copy);
+ listeners.populate(copy);
lvq[key] = msg;
}else {
i->second->setReplacementMessage(msg,this);
}
}else {
messages.push_back(qm);
- listeners.swap(copy);
+ listeners.populate(copy);
}
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
QueuedMessage Queue::getFront()