diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 138 |
1 files changed, 63 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 08ee133981..c3b14688d6 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,7 +56,7 @@ using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; -namespace +namespace { const std::string qpidMaxSize("qpid.max_size"); const std::string qpidMaxCount("qpid.max_count"); @@ -76,16 +76,16 @@ const int ENQUEUE_ONLY=1; const int ENQUEUE_AND_DEQUEUE=2; } -Queue::Queue(const string& _name, bool _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, Manageable* parent, Broker* b) : - name(_name), + name(_name), autodelete(_autodelete), store(_store), - owner(_owner), + owner(_owner), consumerCount(0), exclusive(0), noLocal(false), @@ -182,9 +182,9 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg, true); - if (store){ + if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure - msg->addToSyncList(shared_from_this(), store); + msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued mgntEnqStats(msg); @@ -192,7 +192,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this - msg->releaseContent(store); + msg->releaseContent(true); } } @@ -209,13 +209,13 @@ void Queue::requeue(const QueuedMessage& msg){ if (!isEnqueued(msg)) return; QueueListeners::NotificationSet copy; - { + { Mutex::ScopedLock locker(messageLock); msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); - // for persistLastNode - don't force a message twice to disk, but force it if no force before + // for persistLastNode - don't force a message twice to disk, but force it if no force before if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { msg.payload->forcePersistent(); if (msg.payload->isForcedPersistent() ){ @@ -234,7 +234,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ } } -bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) +bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); QPID_LOG(debug, "Attempting to acquire message at " << position); @@ -258,7 +258,7 @@ bool Queue::acquire(const QueuedMessage& msg) { QPID_LOG(debug, "attempting to acquire " << msg.position); for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set - || (lastValueQueue && (i->position == msg.position) && + || (lastValueQueue && (i->position == msg.position) && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { clearLVQIndex(msg); @@ -296,7 +296,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) case NO_MESSAGES: default: return false; - } + } } else { return browseNextMessage(m, c); } @@ -317,7 +317,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) //enqueued and so is not available for consumption yet, //register consumer for notification when this changes listeners.addListener(c); - return false; + return false; } else { //check that consumer has sufficient credit for the //message (if it does not, no need to register it for @@ -332,7 +332,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; @@ -345,7 +345,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ } if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; popMsg(msg); return CONSUMED; @@ -358,7 +358,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); return CANT_CONSUME; - } + } } } } @@ -423,7 +423,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { if (c->position < getFront().position) { msg = getFront(); return true; - } else { + } else { //TODO: can improve performance of this search, for now just searching linearly from end Messages::reverse_iterator pos; for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { @@ -524,7 +524,7 @@ void Queue::purgeExpired() */ uint32_t Queue::purge(const uint32_t purge_request){ Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 + uint32_t purge_count = purge_request; // only comes into play if >0 uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. @@ -537,7 +537,7 @@ uint32_t Queue::purge(const uint32_t purge_request){ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning while((!qty || move_count--) && !messages.empty()) { @@ -566,15 +566,16 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ Messages dequeues; QueueListeners::NotificationSet copy; { - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); + msg->setStore(store); if (policy.get()) { policy->tryEnqueue(qm); //depending on policy, may have some dequeues if (!isRecovery) pendingDequeues.swap(dequeues); } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - + LVQ::iterator i; const framing::FieldTable* ft = msg->getApplicationHeaders(); if (lastValueQueue && ft){ @@ -584,7 +585,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (i == lvq.end() || msg->isUpdateMessage()){ messages.push_back(qm); listeners.populate(copy); - lvq[key] = msg; + lvq[key] = msg; }else { boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); if (!old) old = i->second; @@ -594,10 +595,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ //recovery is complete pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); } else { - Mutex::ScopedUnlock u(messageLock); + Mutex::ScopedUnlock u(messageLock); dequeue(0, QueuedMessage(qm.queue, old, qm.position)); } - } + } }else { messages.push_back(qm); listeners.populate(copy); @@ -632,8 +633,8 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) if (ft) { string key = ft->getAsString(qpidVQMatchProperty); if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } + lvq[key] = replacement; + } } msg.payload = replacement; } @@ -644,7 +645,7 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - + uint32_t count = 0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { //NOTE: don't need to use checkLvqReplace() here as it @@ -652,7 +653,7 @@ uint32_t Queue::getMessageCount() const //so the enqueueComplete check has no effect if ( i->payload->isEnqueueComplete() ) count ++; } - + return count; } @@ -696,13 +697,13 @@ void Queue::setLastNodeFailure() } } -// return true if store exists, +// return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } - + if (traceId.size()) { msg->addTraceId(traceId); } @@ -716,13 +717,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) return false; } -// return true if store exists, +// return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { { Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; - if (!ctxt) { + if (!ctxt) { dequeued(msg); } } @@ -738,7 +739,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + dequeued(msg); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -794,7 +795,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); lastValueQueue = lastValueQueueNoBrowse; } - + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); @@ -803,7 +804,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (excludeList.size()) { split(traceExclude, excludeList, ", "); } - QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); eventMode = _settings.getAsInt(qpidQueueEventGeneration); @@ -859,9 +860,9 @@ const QueuePolicy* Queue::getPolicy() return policy.get(); } -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; } void Queue::setPersistenceId(uint64_t _persistenceId) const @@ -880,18 +881,18 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const persistenceId = _persistenceId; } -void Queue::encode(Buffer& buffer) const +void Queue::encode(Buffer& buffer) const { buffer.putShortString(name); buffer.put(settings); - if (policy.get()) { + if (policy.get()) { buffer.put(*policy); } } uint32_t Queue::encodedSize() const { - return name.size() + 1/*short string size octet*/ + settings.encodedSize() + return name.size() + 1/*short string size octet*/ + settings.encodedSize() + (policy.get() ? (*policy).encodedSize() : 0); } @@ -922,50 +923,50 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) { - if (broker.getQueues().destroyIf(queue->getName(), + if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } -bool Queue::isExclusiveOwner(const OwnershipToken* const o) const -{ +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ Mutex::ScopedLock locker(ownershipLock); - return o == owner; + return o == owner; } -void Queue::releaseExclusiveOwnership() -{ +void Queue::releaseExclusiveOwnership() +{ Mutex::ScopedLock locker(ownershipLock); - owner = 0; + owner = 0; } -bool Queue::setExclusiveOwner(const OwnershipToken* const o) -{ +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; } else { - owner = o; + owner = o; return true; } } -bool Queue::hasExclusiveOwner() const -{ +bool Queue::hasExclusiveOwner() const +{ Mutex::ScopedLock locker(ownershipLock); - return owner != 0; + return owner != 0; } -bool Queue::hasExclusiveConsumer() const -{ - return exclusive; +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; } void Queue::setExternalQueueStore(ExternalQueueStore* inst) { - if (externalQueueStore!=inst && externalQueueStore) - delete externalQueueStore; + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; externalQueueStore = inst; if (inst) { @@ -975,19 +976,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } -bool Queue::releaseMessageContent(const QueuedMessage& m) -{ - if (store && !NullMessageStore::isNullStore(store)) { - QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); - m.payload->releaseContent(store); - return true; - } else { - QPID_LOG(warning, "Message " << m.position << " on " << name - << " cannot be released from memory as the queue is not durable"); - return false; - } -} - ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1062,7 +1050,7 @@ bool Queue::isEnqueued(const QueuedMessage& msg) void Queue::addPendingDequeue(const QueuedMessage& msg) { //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); + pendingDequeues.push_back(msg); } QueueListeners& Queue::getListeners() { return listeners; } |