diff options
Diffstat (limited to 'RC9/qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | RC9/qpid/cpp/src/qpid/broker/Queue.cpp | 900 |
1 files changed, 900 insertions, 0 deletions
diff --git a/RC9/qpid/cpp/src/qpid/broker/Queue.cpp b/RC9/qpid/cpp/src/qpid/broker/Queue.cpp new file mode 100644 index 0000000000..9089ba0c54 --- /dev/null +++ b/RC9/qpid/cpp/src/qpid/broker/Queue.cpp @@ -0,0 +1,900 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Broker.h" +#include "Queue.h" +#include "Exchange.h" +#include "DeliverableMessage.h" +#include "MessageStore.h" +#include "NullMessageStore.h" +#include "QueueRegistry.h" + +#include "qpid/StringUtils.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" +#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" + +#include <iostream> +#include <algorithm> +#include <functional> + +#include <boost/bind.hpp> +#include <boost/intrusive_ptr.hpp> + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using std::for_each; +using std::mem_fun; +namespace _qmf = qmf::org::apache::qpid::broker; + + +namespace +{ +const std::string qpidMaxSize("qpid.max_size"); +const std::string qpidMaxCount("qpid.max_count"); +const std::string qpidNoLocal("no-local"); +const std::string qpidTraceIdentity("qpid.trace.id"); +const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueue("qpid.last_value_queue"); +const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); +const std::string qpidPersistLastNode("qpid.persist_last_node"); +const std::string qpidVQMatchProperty("qpid.LVQ_key"); +} + + +Queue::Queue(const string& _name, bool _autodelete, + MessageStore* const _store, + const OwnershipToken* const _owner, + Manageable* parent) : + + name(_name), + autodelete(_autodelete), + store(_store), + owner(_owner), + consumerCount(0), + exclusive(0), + noLocal(false), + lastValueQueue(false), + lastValueQueueNoBrowse(false), + persistLastNode(false), + inLastNodeFailure(false), + persistenceId(0), + policyExceeded(false), + mgmtObject(0) +{ + if (parent != 0) + { + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + + if (agent != 0) + { + mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); + + // Add the object to the management agent only if this queue is not durable. + // If it's durable, we will add it later when the queue is assigned a persistenceId. + if (store == 0) + agent->addObject (mgmtObject); + } + } +} + +Queue::~Queue() +{ + if (mgmtObject != 0) + mgmtObject->resourceDestroy (); +} + +void Queue::notifyDurableIOComplete() +{ + QueueListeners::NotificationSet copy; + { + Mutex::ScopedLock locker(messageLock); + listeners.populate(copy); + } + copy.notify(); +} + +bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) +{ + return token && token->isLocal(msg->getPublisher()); +} + +bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) +{ + //message is considered local if it was published on the same + //connection as that of the session which declared this queue + //exclusive (owner) or which has an exclusive subscription + //(exclusive) + return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); +} + +bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) +{ + return traceExclude.size() && msg->isExcluded(traceExclude); +} + +void Queue::deliver(boost::intrusive_ptr<Message>& msg){ + + if (msg->isImmediate() && getConsumerCount() == 0) { + if (alternateExchange) { + DeliverableMessage deliverable(msg); + alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); + } + } else if (isLocal(msg)) { + //drop message + QPID_LOG(info, "Dropping 'local' message from " << getName()); + } else if (isExcluded(msg)) { + //drop message + QPID_LOG(info, "Dropping excluded message from " << getName()); + } else { + // if no store then mark as enqueued + if (!enqueue(0, msg)){ + push(msg); + msg->enqueueComplete(); + }else { + push(msg); + } + mgntEnqStats(msg); + QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); + } +} + + +void Queue::recover(boost::intrusive_ptr<Message>& msg){ + push(msg); + msg->enqueueComplete(); // mark the message as enqueued + mgntEnqStats(msg); + + if (store && !msg->isContentLoaded()) { + //content has not been loaded, need to ensure that lazy loading mode is set: + //TODO: find a nicer way to do this + msg->releaseContent(store); + } +} + +void Queue::process(boost::intrusive_ptr<Message>& msg){ + push(msg); + mgntEnqStats(msg); + if (mgmtObject != 0){ + mgmtObject->inc_msgTxnEnqueues (); + mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + } +} + +void Queue::requeue(const QueuedMessage& msg){ + if (policy.get() && !policy->isEnqueued(msg)) return; + + QueueListeners::NotificationSet copy; + { + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); + listeners.populate(copy); + } + copy.notify(); +} + +void Queue::clearLVQIndex(const QueuedMessage& msg){ + if (lastValueQueue){ + const framing::FieldTable* ft = msg.payload->getApplicationHeaders(); + string key = ft->getAsString(qpidVQMatchProperty); + lvq.erase(key); + } +} + +bool Queue::acquire(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); + QPID_LOG(debug, "attempting to acquire " << msg.position); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set + || (lastValueQueue && (i->position == msg.position) && + msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { + + clearLVQIndex(msg); + messages.erase(i); + QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); + return true; + } else { + QPID_LOG(debug, "No match: " << i->position << " != " << msg.position); + } + } + QPID_LOG(debug, "Acquire failed for " << msg.position); + return false; +} + +bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +{ + if (c->preAcquires()) { + return consumeNextMessage(m, c); + } else { + return browseNextMessage(m, c); + } +} + +bool Queue::checkForMessages(Consumer::shared_ptr c) +{ + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) { + //no message available, register consumer for notification + //when this changes + listeners.addListener(c); + return false; + } else { + QueuedMessage msg = getFront(); + if (store && !msg.payload->isEnqueueComplete()) { + //though a message is on the queue, it has not yet been + //enqueued and so is not available for consumption yet, + //register consumer for notification when this changes + listeners.addListener(c); + return false; + } else { + //check that consumer has sufficient credit for the + //message (if it does not, no need to register it for + //notification as the consumer itself will handle the + //credit allocation required to change this condition). + return c->accept(msg.payload); + } + } +} + +bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +{ + while (true) { + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + listeners.addListener(c); + return false; + } else { + QueuedMessage msg = getFront(); + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + popAndDequeue(); + continue; + } + + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + m = msg; + popMsg(msg); + return true; + } else { + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + return false; + } + } else { + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + return false; + } + } + } +} + + +bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +{ + QueuedMessage msg(this); + while (seek(msg, c)) { + if (c->filter(msg.payload) && !msg.payload->hasExpired()) { + if (c->accept(msg.payload)) { + //consumer wants the message + c->position = msg.position; + m = msg; + if (!lastValueQueueNoBrowse) clearLVQIndex(msg); + if (lastValueQueue) { + boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); + if (replacement.get()) m.payload = replacement; + } + return true; + } else { + //browser hasn't got enough credit for the message + QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'"); + return false; + } + } else { + //consumer will never want this message, continue seeking + c->position = msg.position; + QPID_LOG(debug, "Browser skipping message from '" << name << "'"); + } + } + return false; +} + +void Queue::removeListener(Consumer::shared_ptr c) +{ + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + listeners.removeListener(c); + if (messages.size()) { + listeners.populate(set); + } + } + set.notify(); +} + +bool Queue::dispatch(Consumer::shared_ptr c) +{ + QueuedMessage msg(this); + if (getNextMessage(msg, c)) { + c->deliver(msg); + return true; + } else { + return false; + } +} + +bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { + Mutex::ScopedLock locker(messageLock); + if (!messages.empty() && messages.back().position > c->position) { + if (c->position < getFront().position) { + msg = getFront(); + return true; + } else { + //TODO: can improve performance of this search, for now just searching linearly from end + Messages::reverse_iterator pos; + for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { + pos = i; + } + msg = *pos; + return true; + } + } + listeners.addListener(c); + return false; +} + +namespace { +struct PositionEquals { + SequenceNumber pos; + PositionEquals(SequenceNumber p) : pos(p) {} + bool operator()(const QueuedMessage& msg) const { return msg.position == pos; } +}; +}// namespace + +QueuedMessage Queue::find(SequenceNumber pos) const { + Mutex::ScopedLock locker(messageLock); + Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos)); + if (i != messages.end()) + return *i; + return QueuedMessage(); +} + +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } + } + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); +} + +void Queue::cancel(Consumer::shared_ptr c){ + removeListener(c); + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); +} + +QueuedMessage Queue::get(){ + Mutex::ScopedLock locker(messageLock); + QueuedMessage msg(this); + + if(!messages.empty()){ + msg = getFront(); + popMsg(msg); + } + return msg; +} + +void Queue::purgeExpired() +{ + //As expired messages are discarded during dequeue also, only + //bother explicitly expiring if the rate of dequeues since last + //attempt is less than one per second. + if (dequeueTracker.sampleRatePerSecond() < 1) { + Messages expired; + { + Mutex::ScopedLock locker(messageLock); + for (Messages::iterator i = messages.begin(); i != messages.end();) { + if (lastValueQueue) checkLvqReplace(*i); + if (i->payload->hasExpired()) { + expired.push_back(*i); + i = messages.erase(i); + } else { + ++i; + } + } + } + for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + } +} + +/** + * purge - for purging all or some messages on a queue + * depending on the purge_request + * + * purge_request == 0 then purge all messages + * == N then purge N messages from queue + * Sometimes purge_request == 1 to unblock the top of queue + */ +uint32_t Queue::purge(const uint32_t purge_request){ + Mutex::ScopedLock locker(messageLock); + uint32_t purge_count = purge_request; // only comes into play if >0 + + uint32_t count = 0; + // Either purge them all or just the some (purge_count) while the queue isn't empty. + while((!purge_request || purge_count--) && !messages.empty()) { + popAndDequeue(); + count++; + } + return count; +} + +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { + Mutex::ScopedLock locker(messageLock); + uint32_t move_count = qty; // only comes into play if qty >0 + uint32_t count = 0; // count how many were moved for returning + + while((!qty || move_count--) && !messages.empty()) { + QueuedMessage qmsg = getFront(); + boost::intrusive_ptr<Message> msg = qmsg.payload; + destq->deliver(msg); // deliver message to the destination queue + popMsg(qmsg); + dequeue(0, qmsg); + count++; + } + return count; +} + +void Queue::popMsg(QueuedMessage& qmsg) +{ + if (lastValueQueue){ + const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); + string key = ft->getAsString(qpidVQMatchProperty); + lvq.erase(key); + } + messages.pop_front(); + ++dequeueTracker; +} + +void Queue::push(boost::intrusive_ptr<Message>& msg){ + QueueListeners::NotificationSet copy; + { + Mutex::ScopedLock locker(messageLock); + QueuedMessage qm(this, msg, ++sequence); + if (policy.get()) policy->tryEnqueue(qm); + + LVQ::iterator i; + if (lastValueQueue){ + const framing::FieldTable* ft = msg->getApplicationHeaders(); + string key = ft->getAsString(qpidVQMatchProperty); + + i = lvq.find(key); + if (i == lvq.end()){ + messages.push_back(qm); + listeners.populate(copy); + lvq[key] = msg; + }else { + i->second->setReplacementMessage(msg,this); + qm.payload = i->second; + dequeued(qm); + } + }else { + messages.push_back(qm); + listeners.populate(copy); + } + } + copy.notify(); +} + +QueuedMessage Queue::getFront() +{ + QueuedMessage msg = messages.front(); + if (lastValueQueue) { + boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); + if (replacement.get()) msg.payload = replacement; + } + return msg; +} + +QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const +{ + boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); + if (replacement.get()) msg.payload = replacement; + return msg; +} + +/** function only provided for unit tests, or code not in critical message path */ +uint32_t Queue::getMessageCount() const +{ + Mutex::ScopedLock locker(messageLock); + + uint32_t count = 0; + for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { + //NOTE: don't need to use checkLvqReplace() here as it + //is only relevant for LVQ which does not support persistence + //so the enqueueComplete check has no effect + if ( i->payload->isEnqueueComplete() ) count ++; + } + + return count; +} + +uint32_t Queue::getConsumerCount() const +{ + Mutex::ScopedLock locker(consumerLock); + return consumerCount; +} + +bool Queue::canAutoDelete() const +{ + Mutex::ScopedLock locker(consumerLock); + return autodelete && !consumerCount; +} + +void Queue::clearLastNodeFailure() +{ + inLastNodeFailure = false; +} + +void Queue::setLastNodeFailure() +{ + if (persistLastNode){ + Mutex::ScopedLock locker(messageLock); + for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { + if (lastValueQueue) checkLvqReplace(*i); + i->payload->forcePersistent(); + if (i->payload->getPersistenceId() == 0){ + enqueue(0, i->payload); + } + } + inLastNodeFailure = true; + } +} + +// return true if store exists, +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +{ + if (inLastNodeFailure && persistLastNode){ + msg->forcePersistent(); + } + + if (traceId.size()) { + msg->addTraceId(traceId); + } + + if (msg->isPersistent() && store && !lastValueQueue) { + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + store->enqueue(ctxt, pmsg, *this); + return true; + } + return false; +} + +// return true if store exists, +bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) +{ + if (policy.get() && !policy->isEnqueued(msg)) return false; + { + Mutex::ScopedLock locker(messageLock); + if (!ctxt) { + dequeued(msg); + } + } + if (msg.payload->isPersistent() && store && !lastValueQueue) { + msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); + store->dequeue(ctxt, pmsg, *this); + return true; + } + return false; +} + +void Queue::dequeueCommitted(const QueuedMessage& msg) +{ + Mutex::ScopedLock locker(messageLock); + dequeued(msg); + if (mgmtObject != 0) { + mgmtObject->inc_msgTxnDequeues(); + mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + } +} + +/** + * Removes a message from the in-memory delivery queue as well + * dequeing it from the logical (and persistent if applicable) queue + */ +void Queue::popAndDequeue() +{ + QueuedMessage msg = getFront(); + popMsg(msg); + dequeue(0, msg); +} + +/** + * Updates policy and management when a message has been dequeued, + * expects messageLock to be held + */ +void Queue::dequeued(const QueuedMessage& msg) +{ + if (policy.get()) policy->dequeued(msg); + mgntDeqStats(msg.payload); +} + + +void Queue::create(const FieldTable& _settings) +{ + settings = _settings; + if (store) { + store->create(*this, _settings); + } + configure(_settings); +} + +void Queue::configure(const FieldTable& _settings) +{ + setPolicy(QueuePolicy::createQueuePolicy(_settings)); + //set this regardless of owner to allow use of no-local with exclusive consumers also + noLocal = _settings.get(qpidNoLocal); + QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + + lastValueQueue= _settings.get(qpidLastValueQueue); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + + lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); + if (lastValueQueueNoBrowse){ + QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); + lastValueQueue = lastValueQueueNoBrowse; + } + + persistLastNode= _settings.get(qpidPersistLastNode); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + + traceId = _settings.getAsString(qpidTraceIdentity); + std::string excludeList = _settings.getAsString(qpidTraceExclude); + if (excludeList.size()) { + split(traceExclude, excludeList, ", "); + } + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId + << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); + + if (mgmtObject != 0) + mgmtObject->set_arguments (_settings); +} + +void Queue::destroy() +{ + if (alternateExchange.get()) { + Mutex::ScopedLock locker(messageLock); + while(!messages.empty()){ + DeliverableMessage msg(getFront().payload); + alternateExchange->route(msg, msg.getMessage().getRoutingKey(), + msg.getMessage().getApplicationHeaders()); + popAndDequeue(); + } + alternateExchange->decAlternateUsers(); + } + + if (store) { + store->flush(*this); + store->destroy(*this); + store = 0;//ensure we make no more calls to the store for this queue + } +} + +void Queue::bound(const string& exchange, const string& key, + const FieldTable& args) +{ + bindings.add(exchange, key, args); +} + +void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) +{ + bindings.unbind(exchanges, shared_ref); +} + +void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) +{ + policy = _policy; +} + +const QueuePolicy* Queue::getPolicy() +{ + return policy.get(); +} + +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; +} + +void Queue::setPersistenceId(uint64_t _persistenceId) const +{ + if (mgmtObject != 0 && persistenceId == 0) + { + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId); + + if (externalQueueStore) { + ManagementObject* childObj = externalQueueStore->GetManagementObject(); + if (childObj != 0) + childObj->setReference(mgmtObject->getObjectId()); + } + } + persistenceId = _persistenceId; +} + +void Queue::encode(Buffer& buffer) const +{ + buffer.putShortString(name); + buffer.put(settings); + if (policy.get()) { + buffer.put(*policy); + } +} + +uint32_t Queue::encodedSize() const +{ + return name.size() + 1/*short string size octet*/ + settings.encodedSize() + + (policy.get() ? (*policy).encodedSize() : 0); +} + +Queue::shared_ptr Queue::decode(QueueRegistry& queues, Buffer& buffer) +{ + string name; + buffer.getShortString(name); + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); + buffer.get(result.first->settings); + result.first->configure(result.first->settings); + if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) { + buffer.get ( *(result.first->policy) ); + } + return result.first; +} + + +void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) +{ + alternateExchange = exchange; +} + +boost::shared_ptr<Exchange> Queue::getAlternateExchange() +{ + return alternateExchange; +} + +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (broker.getQueues().destroyIf(queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + queue->unbind(broker.getExchanges(), queue); + queue->destroy(); + } +} + +bool Queue::isExclusiveOwner(const OwnershipToken* const o) const +{ + Mutex::ScopedLock locker(ownershipLock); + return o == owner; +} + +void Queue::releaseExclusiveOwnership() +{ + Mutex::ScopedLock locker(ownershipLock); + owner = 0; +} + +bool Queue::setExclusiveOwner(const OwnershipToken* const o) +{ + Mutex::ScopedLock locker(ownershipLock); + if (owner) { + return false; + } else { + owner = o; + return true; + } +} + +bool Queue::hasExclusiveOwner() const +{ + Mutex::ScopedLock locker(ownershipLock); + return owner != 0; +} + +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; +} + +void Queue::setExternalQueueStore(ExternalQueueStore* inst) { + if (externalQueueStore!=inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; + + if (inst) { + ManagementObject* childObj = inst->GetManagementObject(); + if (childObj != 0 && mgmtObject != 0) + childObj->setReference(mgmtObject->getObjectId()); + } +} + +bool Queue::releaseMessageContent(const QueuedMessage& m) +{ + if (store && !NullMessageStore::isNullStore(store)) { + QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); + m.payload->releaseContent(store); + return true; + } else { + QPID_LOG(warning, "Message " << m.position << " on " << name + << " cannot be released from memory as the queue is not durable"); + return false; + } +} + +ManagementObject* Queue::GetManagementObject (void) const +{ + return (ManagementObject*) mgmtObject; +} + +Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case _qmf::Queue::METHOD_PURGE : + _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; + purge (iargs.i_request); + status = Manageable::STATUS_OK; + break; + } + + return status; +} + +void Queue::setPosition(SequenceNumber n) { + Mutex::ScopedLock locker(messageLock); + sequence = n; +} |