summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp139
1 files changed, 71 insertions, 68 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 090c4b4bca..4b94cd32b0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -90,8 +90,12 @@ Queue::~Queue()
void Queue::notifyDurableIOComplete()
{
- Mutex::ScopedLock locker(messageLock);
- notify();
+ Listeners copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.swap(copy);
+ }
+ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -181,10 +185,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
- Mutex::ScopedLock locker(messageLock);
- msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
- notify();
+ Listeners copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
+ listeners.swap(copy);
+ }
+ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -203,16 +211,16 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-bool Queue::getNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
- if (c.preAcquires()) {
+ if (c->preAcquires()) {
return consumeNextMessage(m, c);
} else {
return browseNextMessage(m, c);
}
}
-bool Queue::checkForMessages(Consumer& c)
+bool Queue::checkForMessages(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
@@ -233,12 +241,12 @@ bool Queue::checkForMessages(Consumer& c)
//message (if it does not, no need to register it for
//notification as the consumer itself will handle the
//credit allocation required to change this condition).
- return c.accept(msg.payload);
+ return c->accept(msg.payload);
}
}
}
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
@@ -254,8 +262,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
return false;
}
- if (c.filter(msg.payload)) {
- if (c.accept(msg.payload)) {
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
messages.pop_front();
return true;
@@ -274,14 +282,14 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
}
-bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
QueuedMessage msg(this);
while (seek(msg, c)) {
- if (c.filter(msg.payload)) {
- if (c.accept(msg.payload)) {
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
//consumer wants the message
- c.position = msg.position;
+ c->position = msg.position;
m = msg;
return true;
} else {
@@ -291,59 +299,47 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c)
}
} else {
//consumer will never want this message, continue seeking
- c.position = msg.position;
+ c->position = msg.position;
QPID_LOG(debug, "Browser skipping message from '" << name << "'");
}
}
return false;
}
-/**
- * notify listeners that there may be messages to process
- */
-void Queue::notify()
-{
- if (listeners.empty()) return;
-
- Listeners copy(listeners);
- listeners.clear();
- for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify));
-}
-
-void Queue::removeListener(Consumer& c)
+void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c);
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
if (i != listeners.end()) listeners.erase(i);
}
-void Queue::addListener(Consumer& c)
+void Queue::addListener(Consumer::shared_ptr c)
{
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c);
- if (i == listeners.end()) listeners.push_back(&c);
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i == listeners.end()) listeners.push_back(c);
}
-bool Queue::dispatch(Consumer& c)
+bool Queue::dispatch(Consumer::shared_ptr c)
{
QueuedMessage msg(this);
if (getNextMessage(msg, c)) {
- c.deliver(msg);
+ c->deliver(msg);
return true;
} else {
return false;
}
}
-bool Queue::seek(QueuedMessage& msg, Consumer& c) {
+bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c.position) {
- if (c.position < messages.front().position) {
+ if (!messages.empty() && messages.back().position > c->position) {
+ if (c->position < messages.front().position) {
msg = messages.front();
return true;
} else {
//TODO: can improve performance of this search, for now just searching linearly from end
Messages::reverse_iterator pos;
- for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) {
+ for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) {
pos = i;
}
msg = *pos;
@@ -354,7 +350,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
return false;
}
-void Queue::consume(Consumer& c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw ResourceLockedException(
@@ -364,7 +360,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){
throw ResourceLockedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
- exclusive = c.getSession();
+ exclusive = c->getSession();
}
}
consumerCount++;
@@ -372,7 +368,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){
mgmtObject->inc_consumerCount ();
}
-void Queue::cancel(Consumer& c){
+void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
@@ -415,35 +411,40 @@ uint32_t Queue::purge(const uint32_t purge_request){
}
void Queue::push(boost::intrusive_ptr<Message>& msg){
- Mutex::ScopedLock locker(messageLock);
- messages.push_back(QueuedMessage(this, msg, ++sequence));
- if (policy.get()) {
- policy->enqueued(msg->contentSize());
- if (policy->limitExceeded()) {
- if (!policyExceeded) {
- policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << name);
- }
- if (store) {
- QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
- msg->releaseContent(store);
+ Listeners copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages.push_back(QueuedMessage(this, msg, ++sequence));
+ if (policy.get()) {
+ policy->enqueued(msg->contentSize());
+ if (policy->limitExceeded()) {
+ if (!policyExceeded) {
+ policyExceeded = true;
+ QPID_LOG(info, "Queue size exceeded policy for " << name);
+ }
+ if (store) {
+ QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
+ msg->releaseContent(store);
+ } else {
+ QPID_LOG(error, "Message " << msg << " on " << name
+ << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
+ }
} else {
- QPID_LOG(error, "Message " << msg << " on " << name
- << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
- }
- } else {
- if (policyExceeded) {
- policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << name);
+ if (policyExceeded) {
+ policyExceeded = false;
+ QPID_LOG(info, "Queue size within policy for " << name);
+ }
}
}
+ listeners.swap(copy);
}
- notify();
+ for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
/** function only provided for unit tests, or code not in critical message path */
-uint32_t Queue::getMessageCount() const{
+uint32_t Queue::getMessageCount() const
+{
Mutex::ScopedLock locker(messageLock);
uint32_t count =0;
@@ -454,12 +455,14 @@ uint32_t Queue::getMessageCount() const{
return count;
}
-uint32_t Queue::getConsumerCount() const{
+uint32_t Queue::getConsumerCount() const
+{
Mutex::ScopedLock locker(consumerLock);
return consumerCount;
}
-bool Queue::canAutoDelete() const{
+bool Queue::canAutoDelete() const
+{
Mutex::ScopedLock locker(consumerLock);
return autodelete && !consumerCount;
}