diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/broker/Queue.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 727 |
1 files changed, 503 insertions, 224 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 27c1cc4ad7..4627b1409a 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,7 +31,10 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/FifoDistributor.h" +#include "qpid/broker/MessageGroupManager.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -41,6 +44,7 @@ #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" @@ -64,7 +68,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -86,16 +90,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -110,7 +114,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + allocator(new FifoDistributor( *messages )) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -163,13 +168,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); } else { - // if no store then mark as enqueued - if (!enqueue(0, msg)){ - push(msg); - msg->enqueueComplete(); - }else { - push(msg); - } + enqueue(0, msg); + push(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -183,11 +183,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->recoverEnqueued(msg); push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } - msg->enqueueComplete(); // mark the message as enqueued if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -211,14 +210,13 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ void Queue::requeue(const QueuedMessage& msg){ assertClusterSafe(); QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; - msg.payload->enqueueComplete(); // mark the message as enqueued messages->reinsert(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -226,16 +224,17 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + observeRequeue(msg, locker); } copy.notify(); } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (messages->remove(position, message)) { + if (acquire(position, message, locker)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -244,9 +243,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess } } -bool Queue::acquire(const QueuedMessage& msg) { - QueuedMessage copy = msg; - return acquireMessageAt(msg.position, copy); +bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) +{ + Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); + QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); + + if (!allocator->allocate( consumer, msg )) { + QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); + return false; + } + + QueuedMessage copy(msg); + if (acquire( msg.position, copy, locker)) { + QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); + return true; + } + QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position"); + return false; } void Queue::notifyListener() @@ -262,7 +276,7 @@ void Queue::notifyListener() set.notify(); } -bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { checkNotDeleted(); if (c->preAcquires()) { @@ -274,52 +288,71 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } } -Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + QueuedMessage msg; + + if (!allocator->nextConsumableMessage(c, msg)) { // no next available + QPID_LOG(debug, "No messages available to dispatch to consumer " << + c->getName() << " on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; - } else { - QueuedMessage msg = messages->front(); - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - popAndDequeue(); - continue; - } + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - m = msg; - pop(); - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return CANT_CONSUME; - } + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->position = msg.position; + acquire( msg.position, msg, locker); + dequeue( 0, msg ); + continue; + } + + // a message is available for this consumer - can the consumer use it? + + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + bool ok = allocator->allocate( c->getName(), msg ); // inform allocator + (void) ok; assert(ok); + ok = acquire( msg.position, msg, locker); + (void) ok; assert(ok); + m = msg; + c->position = m.position; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); return CANT_CONSUME; - } + } + } else { + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + c->position = msg.position; + return CANT_CONSUME; } } } - -bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { - QueuedMessage msg(this); - while (seek(msg, c)) { + while (true) { + Mutex::ScopedLock locker(messageLock); + QueuedMessage msg; + + if (!allocator->nextBrowsableMessage(c, msg)) { // no next available + QPID_LOG(debug, "No browsable messages available for consumer " << + c->getName() << " on queue '" << name << "'"); + listeners.addListener(c); + return false; + } + if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message @@ -333,8 +366,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) } } else { //consumer will never want this message, continue seeking - c->position = msg.position; QPID_LOG(debug, "Browser skipping message from '" << name << "'"); + c->position = msg.position; } } return false; @@ -364,61 +397,71 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message -bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { - Mutex::ScopedLock locker(messageLock); - if (messages->next(c->position, msg)) { - return true; - } else { - listeners.addListener(c); - return false; - } -} - -QueuedMessage Queue::find(SequenceNumber pos) const { +bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - messages->find(pos, msg); - return msg; + if (messages->find(pos, msg)) + return true; + return false; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } + } + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); } } - consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); - //reset auto deletion timer if necessary - if (autoDeleteTimeout && autoDeleteTask) { - autoDeleteTask->cancel(); + Mutex::ScopedLock locker(messageLock); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerAdded(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } } } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + { + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + Mutex::ScopedLock locker(messageLock); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerRemoved(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } } QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - messages->pop(msg); + if (messages->pop(msg)) + observeAcquire(msg, locker); return msg; } @@ -432,22 +475,135 @@ bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& messa } } -void Queue::purgeExpired() +/** + *@param lapse: time since the last purgeExpired + */ +void Queue::purgeExpired(qpid::sys::Duration lapse) { //As expired messages are discarded during dequeue also, only //bother explicitly expiring if the rate of dequeues since last - //attempt is less than one per second. - - if (dequeueTracker.sampleRatePerSecond() < 1) { + //attempt is less than one per second. + int count = dequeueSincePurge.get(); + dequeueSincePurge -= count; + int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; + if (seconds == 0 || count / seconds < 1) { std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); - messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); + messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); + } + + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); } - for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } } + +namespace { + // for use with purge/move below - collect messages that match a given filter + // + class MessageFilter + { + public: + static const std::string typeKey; + static const std::string paramsKey; + static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); + virtual bool match( const QueuedMessage& ) const { return true; } + virtual ~MessageFilter() {} + protected: + MessageFilter() {}; + }; + const std::string MessageFilter::typeKey("filter_type"); + const std::string MessageFilter::paramsKey("filter_params"); + + // filter by message header string value exact match + class HeaderMatchFilter : public MessageFilter + { + public: + /* Config: + { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "<header name>", + 'header_value' : "<value to match>" + } + } + */ + static const std::string typeKey; + static const std::string headerKey; + static const std::string valueKey; + HeaderMatchFilter( const std::string& _header, const std::string& _value ) + : MessageFilter (), header(_header), value(_value) {} + bool match( const QueuedMessage& msg ) const + { + const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders(); + if (!headers) return false; + FieldTable::ValuePtr h = headers->get(header); + if (!h || !h->convertsTo<std::string>()) return false; + return h->get<std::string>() == value; + } + private: + const std::string header; + const std::string value; + }; + const std::string HeaderMatchFilter::typeKey("header_match_str"); + const std::string HeaderMatchFilter::headerKey("header_key"); + const std::string HeaderMatchFilter::valueKey("header_value"); + + // factory to create correct filter based on map + MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) + { + using namespace qpid::types; + if (filter && !filter->empty()) { + Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); + if (i != filter->end()) { + + if (i->second.asString() == HeaderMatchFilter::typeKey) { + Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey); + if (p != filter->end() && p->second.getType() == VAR_MAP) { + Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey); + Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey); + if (k != p->second.asMap().end() && v != p->second.asMap().end()) { + std::string headerKey(k->second.asString()); + std::string value(v->second.asString()); + QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value ); + return new HeaderMatchFilter( headerKey, value ); + } + } + } + } + QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'"); + } + return new MessageFilter(); + } + + // used by removeIf() to collect all messages matching a filter, maximum match count is + // optional. + struct Collector { + const uint32_t maxMatches; + MessageFilter& filter; + std::deque<QueuedMessage> matches; + Collector(MessageFilter& filter, uint32_t max) + : maxMatches(max), filter(filter) {} + bool operator() (QueuedMessage& qm) + { + if (maxMatches == 0 || matches.size() < maxMatches) { + if (filter.match( qm )) { + matches.push_back(qm); + return true; + } + } + return false; + } + }; + +} // end namespace + + /** * purge - for purging all or some messages on a queue * depending on the purge_request @@ -459,63 +615,77 @@ void Queue::purgeExpired() * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, * even if the queue is ordered by priority. + * + * An optional filter can be supplied that will be applied against each message. The + * message is purged only if the filter matches. See MessageDistributor for more detail. */ -uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, + const qpid::types::Variant::Map *filter) { - Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 - std::deque<DeliverableMessage> rerouteQueue; + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), purge_request); - uint32_t count = 0; - // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages->empty()) { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + observeAcquire(*qmsg, locker); + dequeue(0, *qmsg); + // now reroute if necessary if (dest.get()) { - // - // If there is a destination exchange, stage the messages onto a reroute queue - // so they don't wind up getting purged more than once. - // - DeliverableMessage msg(messages->front().payload); - rerouteQueue.push_back(msg); + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); + dest->routeWithAlternate(dmsg); } - popAndDequeue(); - count++; } - - // - // Re-route purged messages into the destination exchange. Note that there's no need - // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. - // - while (!rerouteQueue.empty()) { - DeliverableMessage msg(rerouteQueue.front()); - rerouteQueue.pop_front(); - dest->route(msg, msg.getMessage().getRoutingKey(), - msg.getMessage().getApplicationHeaders()); - } - - return count; + return c.matches.size(); } -uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { - Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 - uint32_t count = 0; // count how many were moved for returning +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter) +{ + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), qty); - while((!qty || move_count--) && !messages->empty()) { - QueuedMessage qmsg = messages->front(); - boost::intrusive_ptr<Message> msg = qmsg.payload; - destq->deliver(msg); // deliver message to the destination queue - pop(); - dequeue(0, qmsg); - count++; + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + observeAcquire(*qmsg, locker); + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); } - return count; + return c.matches.size(); } -void Queue::pop() +/** Acquire the front (oldest) message from the in-memory queue. + * assumes messageLock held by caller + */ +void Queue::pop(const Mutex::ScopedLock& locker) { assertClusterSafe(); - messages->pop(); - ++dequeueTracker; + QueuedMessage msg; + if (messages->pop(msg)) { + observeAcquire(msg, locker); + ++dequeueSincePurge; + } +} + +/** Acquire the message at the given position, return true and msg if acquire succeeds */ +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const Mutex::ScopedLock& locker) +{ + if (messages->remove(position, msg)) { + observeAcquire(msg, locker); + ++dequeueSincePurge; + return true; + } + return false; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ @@ -524,13 +694,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ QueuedMessage removed; bool dequeueRequired = false; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); + dequeueRequired = messages->push(qm, removed); + if (dequeueRequired) + observeAcquire(removed, locker); listeners.populate(copy); - enqueued(qm); + observeEnqueue(qm, locker); } copy.notify(); if (dequeueRequired) { @@ -546,7 +718,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - if (message.payload->isEnqueueComplete()) (*result)++; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -606,7 +778,7 @@ void Queue::setLastNodeFailure() } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck) { ScopedUse u(barrier); @@ -620,24 +792,21 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { - //copy on write: take deep copy of message before modifying it - //as the frames may already be available for delivery on other - //threads - boost::intrusive_ptr<Message> copy(new Message(*msg)); - msg = copy; msg->addTraceId(traceId); } if ((msg->isPersistent() || msg->checkContentReleasable()) && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete() + // when it considers the message stored. + msg->enqueueAsync(shared_from_this(), store); boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; @@ -654,10 +823,10 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) { Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->enqueueAborted(msg); + if (policy.get()) policy->enqueueAborted(msg); } -// return true if store exists, +// return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { ScopedUse u(barrier); @@ -666,8 +835,8 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { - dequeued(msg); + if (!ctxt) { + observeDequeue(msg, locker); } } // This check prevents messages which have been forced persistent on one queue from dequeuing @@ -687,7 +856,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + observeDequeue(msg, locker); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -695,21 +864,23 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) } /** - * Removes a message from the in-memory delivery queue as well - * dequeing it from the logical (and persistent if applicable) queue + * Removes the first (oldest) message from the in-memory delivery queue as well dequeing + * it from the logical (and persistent if applicable) queue */ -void Queue::popAndDequeue() +void Queue::popAndDequeue(const Mutex::ScopedLock& held) { - QueuedMessage msg = messages->front(); - pop(); - dequeue(0, msg); + if (!messages->empty()) { + QueuedMessage msg = messages->front(); + pop(held); + dequeue(0, msg); + } } /** * Updates policy and management when a message has been dequeued, * expects messageLock to be held */ -void Queue::dequeued(const QueuedMessage& msg) +void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); @@ -722,6 +893,33 @@ void Queue::dequeued(const QueuedMessage& msg) } } +/** updates queue observers when a message has become unavailable for transfer, + * expects messageLock to be held + */ +void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->acquired(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what()); + } + } +} + +/** updates queue observers when a message has become re-available for transfer, + * expects messageLock to be held + */ +void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->requeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); + } + } +} void Queue::create(const FieldTable& _settings) { @@ -729,7 +927,7 @@ void Queue::create(const FieldTable& _settings) if (store) { store->create(*this, _settings); } - configure(_settings); + configureImpl(_settings); } @@ -742,8 +940,8 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri return v->get<int>(); } else if (v->convertsTo<std::string>()){ std::string s = v->get<std::string>(); - try { - return boost::lexical_cast<int>(s); + try { + return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) { QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); return 0; @@ -754,15 +952,45 @@ int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::stri } } -void Queue::configure(const FieldTable& _settings, bool recovering) +bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key) { + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return false; + } else if (v->convertsTo<int>()) { + return v->get<int>() != 0; + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + if (s == "True") return true; + if (s == "true") return true; + if (s == "False") return false; + if (s == "false") return false; + try { + return boost::lexical_cast<bool>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s); + return false; + } + } else { + QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v); + return false; + } +} + +void Queue::configure(const FieldTable& _settings) +{ + settings = _settings; + configureImpl(settings); +} +void Queue::configureImpl(const FieldTable& _settings) +{ eventMode = _settings.getAsInt(qpidQueueEventGeneration); if (eventMode && broker) { broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); } - if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); @@ -776,32 +1004,43 @@ void Queue::configure(const FieldTable& _settings, bool recovering) setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); } if (broker && broker->getManagementAgent()) { - ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings); + ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings, broker->getOptions().queueThresholdEventRatio); } //set this regardless of owner to allow use of no-local with exclusive consumers also - noLocal = _settings.get(qpidNoLocal); + noLocal = getBoolSetting(_settings, qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); - } else if (_settings.get(qpidLastValueQueueNoBrowse)) { + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); + } else if (getBoolSetting(_settings, qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); - } else if (_settings.get(qpidLastValueQueue)) { + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); + } else if (getBoolSetting(_settings, qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } else { // default (FIFO) queue type + // override default message allocator if message groups configured. + boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings)); + if (mgm) { + allocator = mgm; + addObserver(mgm); + } } } - - persistLastNode= _settings.get(qpidPersistLastNode); + + persistLastNode = getBoolSetting(_settings, qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); traceId = _settings.getAsString(qpidTraceIdentity); @@ -809,32 +1048,32 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); - if (autoDeleteTimeout) - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); + } - if ( isDurable() && ! getPersistenceId() && ! recovering ) - store->create(*this, _settings); + QueueFlowLimit::observe(*this, _settings); } -void Queue::destroy() +void Queue::destroyed() { + unbind(broker->getExchanges()); if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); - alternateExchange->route(msg, msg.getMessage().getRoutingKey(), - msg.getMessage().getApplicationHeaders()); - popAndDequeue(); + alternateExchange->routeWithAlternate(msg); + popAndDequeue(locker); } alternateExchange->decAlternateUsers(); } @@ -846,6 +1085,7 @@ void Queue::destroy() store = 0;//ensure we make no more calls to the store for this queue } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); + notifyDeleted(); } void Queue::notifyDeleted() @@ -865,9 +1105,9 @@ void Queue::bound(const string& exchange, const string& key, bindings.add(exchange, key, args); } -void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) +void Queue::unbind(ExchangeRegistry& exchanges) { - bindings.unbind(exchanges, shared_ref); + bindings.unbind(exchanges, shared_from_this()); } void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) @@ -880,9 +1120,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -896,11 +1136,11 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); @@ -914,13 +1154,14 @@ uint32_t Queue::encodedSize() const + (policy.get() ? (*policy).encodedSize() : 0); } -Queue::shared_ptr Queue::decode ( QueueRegistry& queues, Buffer& buffer, bool recovering ) +Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) { string name; buffer.getShortString(name); - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); - buffer.get(result.first->settings); - result.first->configure(result.first->settings, recovering ); + FieldTable settings; + buffer.get(settings); + boost::shared_ptr<Exchange> alternate; + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true, false, 0, alternate, settings, true); if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) { buffer.get ( *(result.first->policy) ); } @@ -952,11 +1193,10 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { QPID_LOG(debug, "Auto-deleting " << queue->getName()); - queue->unbind(broker.getExchanges(), queue); - queue->destroy(); + queue->destroyed(); } } @@ -965,7 +1205,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Broker& broker; Queue::shared_ptr queue; - AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} void fire() @@ -983,27 +1223,27 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) if (queue->autoDeleteTimeout && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); - broker.getClusterTimer().add(queue->autoDeleteTask); + broker.getClusterTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { tryAutoDeleteImpl(broker, queue); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { autoDeleteTask->cancel(); @@ -1012,25 +1252,25 @@ bool Queue::setExclusiveOwner(const OwnershipToken* const o) if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; externalQueueStore = inst; if (inst) { @@ -1055,7 +1295,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str case _qmf::Queue::METHOD_PURGE : { _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; - purge(purgeArgs.i_request); + purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1076,7 +1316,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str } } - purge(rerouteArgs.i_request, dest); + purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1085,6 +1325,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::query(qpid::types::Variant::Map& results) const +{ + Mutex::ScopedLock locker(messageLock); + /** @todo add any interesting queue state into results */ + if (allocator) allocator->query(results); +} + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; @@ -1119,7 +1367,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -void Queue::enqueued(const QueuedMessage& m) +/** updates queue observers and state when a message has become available for transfer, + * expects messageLock to be held + */ +void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1142,7 +1393,8 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (policy.get()) { policy->recoverEnqueued(payload); } - enqueued(m); + Mutex::ScopedLock locker(messageLock); + observeEnqueue(m, locker); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1166,6 +1418,7 @@ void Queue::checkNotDeleted() void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { + Mutex::ScopedLock locker(messageLock); observers.insert(observer); } @@ -1175,6 +1428,32 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } + +bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, + const qpid::framing::FieldTable& arguments) +{ + if (exchange->bind(shared_from_this(), key, &arguments)) { + bound(exchange->getName(), key, arguments); + if (exchange->isDurable() && isDurable()) { + store->bind(*exchange, *this, key, arguments); + } + return true; + } else { + return false; + } +} + + +const Broker* Queue::getBroker() +{ + return broker; +} + +void Queue::setDequeueSincePurge(uint32_t value) { + dequeueSincePurge = value; +} + + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() |