diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 139 |
1 files changed, 71 insertions, 68 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 090c4b4bca..4b94cd32b0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -90,8 +90,12 @@ Queue::~Queue() void Queue::notifyDurableIOComplete() { - Mutex::ScopedLock locker(messageLock); - notify(); + Listeners copy; + { + Mutex::ScopedLock locker(messageLock); + listeners.swap(copy); + } + for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -181,10 +185,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); - notify(); + Listeners copy; + { + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); + listeners.swap(copy); + } + for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } bool Queue::acquire(const QueuedMessage& msg) { @@ -203,16 +211,16 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) +bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { - if (c.preAcquires()) { + if (c->preAcquires()) { return consumeNextMessage(m, c); } else { return browseNextMessage(m, c); } } -bool Queue::checkForMessages(Consumer& c) +bool Queue::checkForMessages(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); if (messages.empty()) { @@ -233,12 +241,12 @@ bool Queue::checkForMessages(Consumer& c) //message (if it does not, no need to register it for //notification as the consumer itself will handle the //credit allocation required to change this condition). - return c.accept(msg.payload); + return c->accept(msg.payload); } } } -bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) +bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { Mutex::ScopedLock locker(messageLock); @@ -254,8 +262,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) return false; } - if (c.filter(msg.payload)) { - if (c.accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; messages.pop_front(); return true; @@ -274,14 +282,14 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) } -bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) +bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { QueuedMessage msg(this); while (seek(msg, c)) { - if (c.filter(msg.payload)) { - if (c.accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { //consumer wants the message - c.position = msg.position; + c->position = msg.position; m = msg; return true; } else { @@ -291,59 +299,47 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) } } else { //consumer will never want this message, continue seeking - c.position = msg.position; + c->position = msg.position; QPID_LOG(debug, "Browser skipping message from '" << name << "'"); } } return false; } -/** - * notify listeners that there may be messages to process - */ -void Queue::notify() -{ - if (listeners.empty()) return; - - Listeners copy(listeners); - listeners.clear(); - for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify)); -} - -void Queue::removeListener(Consumer& c) +void Queue::removeListener(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c); + Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); if (i != listeners.end()) listeners.erase(i); } -void Queue::addListener(Consumer& c) +void Queue::addListener(Consumer::shared_ptr c) { - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c); - if (i == listeners.end()) listeners.push_back(&c); + Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); + if (i == listeners.end()) listeners.push_back(c); } -bool Queue::dispatch(Consumer& c) +bool Queue::dispatch(Consumer::shared_ptr c) { QueuedMessage msg(this); if (getNextMessage(msg, c)) { - c.deliver(msg); + c->deliver(msg); return true; } else { return false; } } -bool Queue::seek(QueuedMessage& msg, Consumer& c) { +bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > c.position) { - if (c.position < messages.front().position) { + if (!messages.empty() && messages.back().position > c->position) { + if (c->position < messages.front().position) { msg = messages.front(); return true; } else { //TODO: can improve performance of this search, for now just searching linearly from end Messages::reverse_iterator pos; - for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) { + for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { pos = i; } msg = *pos; @@ -354,7 +350,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { return false; } -void Queue::consume(Consumer& c, bool requestExclusive){ +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw ResourceLockedException( @@ -364,7 +360,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ throw ResourceLockedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { - exclusive = c.getSession(); + exclusive = c->getSession(); } } consumerCount++; @@ -372,7 +368,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ mgmtObject->inc_consumerCount (); } -void Queue::cancel(Consumer& c){ +void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); Mutex::ScopedLock locker(consumerLock); consumerCount--; @@ -415,35 +411,40 @@ uint32_t Queue::purge(const uint32_t purge_request){ } void Queue::push(boost::intrusive_ptr<Message>& msg){ - Mutex::ScopedLock locker(messageLock); - messages.push_back(QueuedMessage(this, msg, ++sequence)); - if (policy.get()) { - policy->enqueued(msg->contentSize()); - if (policy->limitExceeded()) { - if (!policyExceeded) { - policyExceeded = true; - QPID_LOG(info, "Queue size exceeded policy for " << name); - } - if (store) { - QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory"); - msg->releaseContent(store); + Listeners copy; + { + Mutex::ScopedLock locker(messageLock); + messages.push_back(QueuedMessage(this, msg, ++sequence)); + if (policy.get()) { + policy->enqueued(msg->contentSize()); + if (policy->limitExceeded()) { + if (!policyExceeded) { + policyExceeded = true; + QPID_LOG(info, "Queue size exceeded policy for " << name); + } + if (store) { + QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory"); + msg->releaseContent(store); + } else { + QPID_LOG(error, "Message " << msg << " on " << name + << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); + } } else { - QPID_LOG(error, "Message " << msg << " on " << name - << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); - } - } else { - if (policyExceeded) { - policyExceeded = false; - QPID_LOG(info, "Queue size within policy for " << name); + if (policyExceeded) { + policyExceeded = false; + QPID_LOG(info, "Queue size within policy for " << name); + } } } + listeners.swap(copy); } - notify(); + for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } /** function only provided for unit tests, or code not in critical message path */ -uint32_t Queue::getMessageCount() const{ +uint32_t Queue::getMessageCount() const +{ Mutex::ScopedLock locker(messageLock); uint32_t count =0; @@ -454,12 +455,14 @@ uint32_t Queue::getMessageCount() const{ return count; } -uint32_t Queue::getConsumerCount() const{ +uint32_t Queue::getConsumerCount() const +{ Mutex::ScopedLock locker(consumerLock); return consumerCount; } -bool Queue::canAutoDelete() const{ +bool Queue::canAutoDelete() const +{ Mutex::ScopedLock locker(consumerLock); return autodelete && !consumerCount; } |