summaryrefslogtreecommitdiff
path: root/RC9/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'RC9/qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--RC9/qpid/cpp/src/qpid/broker/Queue.cpp900
1 files changed, 900 insertions, 0 deletions
diff --git a/RC9/qpid/cpp/src/qpid/broker/Queue.cpp b/RC9/qpid/cpp/src/qpid/broker/Queue.cpp
new file mode 100644
index 0000000000..9089ba0c54
--- /dev/null
+++ b/RC9/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -0,0 +1,900 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Broker.h"
+#include "Queue.h"
+#include "Exchange.h"
+#include "DeliverableMessage.h"
+#include "MessageStore.h"
+#include "NullMessageStore.h"
+#include "QueueRegistry.h"
+
+#include "qpid/StringUtils.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
+
+#include <iostream>
+#include <algorithm>
+#include <functional>
+
+#include <boost/bind.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using std::for_each;
+using std::mem_fun;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+
+namespace
+{
+const std::string qpidMaxSize("qpid.max_size");
+const std::string qpidMaxCount("qpid.max_count");
+const std::string qpidNoLocal("no-local");
+const std::string qpidTraceIdentity("qpid.trace.id");
+const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueue("qpid.last_value_queue");
+const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
+const std::string qpidPersistLastNode("qpid.persist_last_node");
+const std::string qpidVQMatchProperty("qpid.LVQ_key");
+}
+
+
+Queue::Queue(const string& _name, bool _autodelete,
+ MessageStore* const _store,
+ const OwnershipToken* const _owner,
+ Manageable* parent) :
+
+ name(_name),
+ autodelete(_autodelete),
+ store(_store),
+ owner(_owner),
+ consumerCount(0),
+ exclusive(0),
+ noLocal(false),
+ lastValueQueue(false),
+ lastValueQueueNoBrowse(false),
+ persistLastNode(false),
+ inLastNodeFailure(false),
+ persistenceId(0),
+ policyExceeded(false),
+ mgmtObject(0)
+{
+ if (parent != 0)
+ {
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+
+ if (agent != 0)
+ {
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
+
+ // Add the object to the management agent only if this queue is not durable.
+ // If it's durable, we will add it later when the queue is assigned a persistenceId.
+ if (store == 0)
+ agent->addObject (mgmtObject);
+ }
+ }
+}
+
+Queue::~Queue()
+{
+ if (mgmtObject != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+void Queue::notifyDurableIOComplete()
+{
+ QueueListeners::NotificationSet copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.populate(copy);
+ }
+ copy.notify();
+}
+
+bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+{
+ return token && token->isLocal(msg->getPublisher());
+}
+
+bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+{
+ //message is considered local if it was published on the same
+ //connection as that of the session which declared this queue
+ //exclusive (owner) or which has an exclusive subscription
+ //(exclusive)
+ return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+}
+
+bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
+{
+ return traceExclude.size() && msg->isExcluded(traceExclude);
+}
+
+void Queue::deliver(boost::intrusive_ptr<Message>& msg){
+
+ if (msg->isImmediate() && getConsumerCount() == 0) {
+ if (alternateExchange) {
+ DeliverableMessage deliverable(msg);
+ alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ }
+ } else if (isLocal(msg)) {
+ //drop message
+ QPID_LOG(info, "Dropping 'local' message from " << getName());
+ } else if (isExcluded(msg)) {
+ //drop message
+ QPID_LOG(info, "Dropping excluded message from " << getName());
+ } else {
+ // if no store then mark as enqueued
+ if (!enqueue(0, msg)){
+ push(msg);
+ msg->enqueueComplete();
+ }else {
+ push(msg);
+ }
+ mgntEnqStats(msg);
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
+ }
+}
+
+
+void Queue::recover(boost::intrusive_ptr<Message>& msg){
+ push(msg);
+ msg->enqueueComplete(); // mark the message as enqueued
+ mgntEnqStats(msg);
+
+ if (store && !msg->isContentLoaded()) {
+ //content has not been loaded, need to ensure that lazy loading mode is set:
+ //TODO: find a nicer way to do this
+ msg->releaseContent(store);
+ }
+}
+
+void Queue::process(boost::intrusive_ptr<Message>& msg){
+ push(msg);
+ mgntEnqStats(msg);
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTxnEnqueues ();
+ mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ }
+}
+
+void Queue::requeue(const QueuedMessage& msg){
+ if (policy.get() && !policy->isEnqueued(msg)) return;
+
+ QueueListeners::NotificationSet copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
+ listeners.populate(copy);
+ }
+ copy.notify();
+}
+
+void Queue::clearLVQIndex(const QueuedMessage& msg){
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
+ string key = ft->getAsString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
+}
+
+bool Queue::acquire(const QueuedMessage& msg) {
+ Mutex::ScopedLock locker(messageLock);
+ QPID_LOG(debug, "attempting to acquire " << msg.position);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set
+ || (lastValueQueue && (i->position == msg.position) &&
+ msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
+
+ clearLVQIndex(msg);
+ messages.erase(i);
+ QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position);
+ return true;
+ } else {
+ QPID_LOG(debug, "No match: " << i->position << " != " << msg.position);
+ }
+ }
+ QPID_LOG(debug, "Acquire failed for " << msg.position);
+ return false;
+}
+
+bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+{
+ if (c->preAcquires()) {
+ return consumeNextMessage(m, c);
+ } else {
+ return browseNextMessage(m, c);
+ }
+}
+
+bool Queue::checkForMessages(Consumer::shared_ptr c)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ //no message available, register consumer for notification
+ //when this changes
+ listeners.addListener(c);
+ return false;
+ } else {
+ QueuedMessage msg = getFront();
+ if (store && !msg.payload->isEnqueueComplete()) {
+ //though a message is on the queue, it has not yet been
+ //enqueued and so is not available for consumption yet,
+ //register consumer for notification when this changes
+ listeners.addListener(c);
+ return false;
+ } else {
+ //check that consumer has sufficient credit for the
+ //message (if it does not, no need to register it for
+ //notification as the consumer itself will handle the
+ //credit allocation required to change this condition).
+ return c->accept(msg.payload);
+ }
+ }
+}
+
+bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+{
+ while (true) {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ listeners.addListener(c);
+ return false;
+ } else {
+ QueuedMessage msg = getFront();
+ if (msg.payload->hasExpired()) {
+ QPID_LOG(debug, "Message expired from queue '" << name << "'");
+ popAndDequeue();
+ continue;
+ }
+
+ if (c->filter(msg.payload)) {
+ if (c->accept(msg.payload)) {
+ m = msg;
+ popMsg(msg);
+ return true;
+ } else {
+ //message(s) are available but consumer hasn't got enough credit
+ QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
+ return false;
+ }
+ } else {
+ //consumer will never want this message
+ QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
+ return false;
+ }
+ }
+ }
+}
+
+
+bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+{
+ QueuedMessage msg(this);
+ while (seek(msg, c)) {
+ if (c->filter(msg.payload) && !msg.payload->hasExpired()) {
+ if (c->accept(msg.payload)) {
+ //consumer wants the message
+ c->position = msg.position;
+ m = msg;
+ if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
+ if (lastValueQueue) {
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) m.payload = replacement;
+ }
+ return true;
+ } else {
+ //browser hasn't got enough credit for the message
+ QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'");
+ return false;
+ }
+ } else {
+ //consumer will never want this message, continue seeking
+ c->position = msg.position;
+ QPID_LOG(debug, "Browser skipping message from '" << name << "'");
+ }
+ }
+ return false;
+}
+
+void Queue::removeListener(Consumer::shared_ptr c)
+{
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.removeListener(c);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
+}
+
+bool Queue::dispatch(Consumer::shared_ptr c)
+{
+ QueuedMessage msg(this);
+ if (getNextMessage(msg, c)) {
+ c->deliver(msg);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
+ Mutex::ScopedLock locker(messageLock);
+ if (!messages.empty() && messages.back().position > c->position) {
+ if (c->position < getFront().position) {
+ msg = getFront();
+ return true;
+ } else {
+ //TODO: can improve performance of this search, for now just searching linearly from end
+ Messages::reverse_iterator pos;
+ for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) {
+ pos = i;
+ }
+ msg = *pos;
+ return true;
+ }
+ }
+ listeners.addListener(c);
+ return false;
+}
+
+namespace {
+struct PositionEquals {
+ SequenceNumber pos;
+ PositionEquals(SequenceNumber p) : pos(p) {}
+ bool operator()(const QueuedMessage& msg) const { return msg.position == pos; }
+};
+}// namespace
+
+QueuedMessage Queue::find(SequenceNumber pos) const {
+ Mutex::ScopedLock locker(messageLock);
+ Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
+ if (i != messages.end())
+ return *i;
+ return QueuedMessage();
+}
+
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
+ }
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
+}
+
+void Queue::cancel(Consumer::shared_ptr c){
+ removeListener(c);
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+}
+
+QueuedMessage Queue::get(){
+ Mutex::ScopedLock locker(messageLock);
+ QueuedMessage msg(this);
+
+ if(!messages.empty()){
+ msg = getFront();
+ popMsg(msg);
+ }
+ return msg;
+}
+
+void Queue::purgeExpired()
+{
+ //As expired messages are discarded during dequeue also, only
+ //bother explicitly expiring if the rate of dequeues since last
+ //attempt is less than one per second.
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ Messages expired;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end();) {
+ if (lastValueQueue) checkLvqReplace(*i);
+ if (i->payload->hasExpired()) {
+ expired.push_back(*i);
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ }
+ for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ }
+}
+
+/**
+ * purge - for purging all or some messages on a queue
+ * depending on the purge_request
+ *
+ * purge_request == 0 then purge all messages
+ * == N then purge N messages from queue
+ * Sometimes purge_request == 1 to unblock the top of queue
+ */
+uint32_t Queue::purge(const uint32_t purge_request){
+ Mutex::ScopedLock locker(messageLock);
+ uint32_t purge_count = purge_request; // only comes into play if >0
+
+ uint32_t count = 0;
+ // Either purge them all or just the some (purge_count) while the queue isn't empty.
+ while((!purge_request || purge_count--) && !messages.empty()) {
+ popAndDequeue();
+ count++;
+ }
+ return count;
+}
+
+uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
+ Mutex::ScopedLock locker(messageLock);
+ uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t count = 0; // count how many were moved for returning
+
+ while((!qty || move_count--) && !messages.empty()) {
+ QueuedMessage qmsg = getFront();
+ boost::intrusive_ptr<Message> msg = qmsg.payload;
+ destq->deliver(msg); // deliver message to the destination queue
+ popMsg(qmsg);
+ dequeue(0, qmsg);
+ count++;
+ }
+ return count;
+}
+
+void Queue::popMsg(QueuedMessage& qmsg)
+{
+ if (lastValueQueue){
+ const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+ string key = ft->getAsString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
+ messages.pop_front();
+ ++dequeueTracker;
+}
+
+void Queue::push(boost::intrusive_ptr<Message>& msg){
+ QueueListeners::NotificationSet copy;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ QueuedMessage qm(this, msg, ++sequence);
+ if (policy.get()) policy->tryEnqueue(qm);
+
+ LVQ::iterator i;
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg->getApplicationHeaders();
+ string key = ft->getAsString(qpidVQMatchProperty);
+
+ i = lvq.find(key);
+ if (i == lvq.end()){
+ messages.push_back(qm);
+ listeners.populate(copy);
+ lvq[key] = msg;
+ }else {
+ i->second->setReplacementMessage(msg,this);
+ qm.payload = i->second;
+ dequeued(qm);
+ }
+ }else {
+ messages.push_back(qm);
+ listeners.populate(copy);
+ }
+ }
+ copy.notify();
+}
+
+QueuedMessage Queue::getFront()
+{
+ QueuedMessage msg = messages.front();
+ if (lastValueQueue) {
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) msg.payload = replacement;
+ }
+ return msg;
+}
+
+QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const
+{
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) msg.payload = replacement;
+ return msg;
+}
+
+/** function only provided for unit tests, or code not in critical message path */
+uint32_t Queue::getMessageCount() const
+{
+ Mutex::ScopedLock locker(messageLock);
+
+ uint32_t count = 0;
+ for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+ //NOTE: don't need to use checkLvqReplace() here as it
+ //is only relevant for LVQ which does not support persistence
+ //so the enqueueComplete check has no effect
+ if ( i->payload->isEnqueueComplete() ) count ++;
+ }
+
+ return count;
+}
+
+uint32_t Queue::getConsumerCount() const
+{
+ Mutex::ScopedLock locker(consumerLock);
+ return consumerCount;
+}
+
+bool Queue::canAutoDelete() const
+{
+ Mutex::ScopedLock locker(consumerLock);
+ return autodelete && !consumerCount;
+}
+
+void Queue::clearLastNodeFailure()
+{
+ inLastNodeFailure = false;
+}
+
+void Queue::setLastNodeFailure()
+{
+ if (persistLastNode){
+ Mutex::ScopedLock locker(messageLock);
+ for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
+ if (lastValueQueue) checkLvqReplace(*i);
+ i->payload->forcePersistent();
+ if (i->payload->getPersistenceId() == 0){
+ enqueue(0, i->payload);
+ }
+ }
+ inLastNodeFailure = true;
+ }
+}
+
+// return true if store exists,
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+{
+ if (inLastNodeFailure && persistLastNode){
+ msg->forcePersistent();
+ }
+
+ if (traceId.size()) {
+ msg->addTraceId(traceId);
+ }
+
+ if (msg->isPersistent() && store && !lastValueQueue) {
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ store->enqueue(ctxt, pmsg, *this);
+ return true;
+ }
+ return false;
+}
+
+// return true if store exists,
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
+{
+ if (policy.get() && !policy->isEnqueued(msg)) return false;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (!ctxt) {
+ dequeued(msg);
+ }
+ }
+ if (msg.payload->isPersistent() && store && !lastValueQueue) {
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
+ store->dequeue(ctxt, pmsg, *this);
+ return true;
+ }
+ return false;
+}
+
+void Queue::dequeueCommitted(const QueuedMessage& msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ dequeued(msg);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTxnDequeues();
+ mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ }
+}
+
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+ QueuedMessage msg = getFront();
+ popMsg(msg);
+ dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(const QueuedMessage& msg)
+{
+ if (policy.get()) policy->dequeued(msg);
+ mgntDeqStats(msg.payload);
+}
+
+
+void Queue::create(const FieldTable& _settings)
+{
+ settings = _settings;
+ if (store) {
+ store->create(*this, _settings);
+ }
+ configure(_settings);
+}
+
+void Queue::configure(const FieldTable& _settings)
+{
+ setPolicy(QueuePolicy::createQueuePolicy(_settings));
+ //set this regardless of owner to allow use of no-local with exclusive consumers also
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+
+ lastValueQueue= _settings.get(qpidLastValueQueue);
+ if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+
+ lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
+ if (lastValueQueueNoBrowse){
+ QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
+ lastValueQueue = lastValueQueueNoBrowse;
+ }
+
+ persistLastNode= _settings.get(qpidPersistLastNode);
+ if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+
+ traceId = _settings.getAsString(qpidTraceIdentity);
+ std::string excludeList = _settings.getAsString(qpidTraceExclude);
+ if (excludeList.size()) {
+ split(traceExclude, excludeList, ", ");
+ }
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
+
+ if (mgmtObject != 0)
+ mgmtObject->set_arguments (_settings);
+}
+
+void Queue::destroy()
+{
+ if (alternateExchange.get()) {
+ Mutex::ScopedLock locker(messageLock);
+ while(!messages.empty()){
+ DeliverableMessage msg(getFront().payload);
+ alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
+ msg.getMessage().getApplicationHeaders());
+ popAndDequeue();
+ }
+ alternateExchange->decAlternateUsers();
+ }
+
+ if (store) {
+ store->flush(*this);
+ store->destroy(*this);
+ store = 0;//ensure we make no more calls to the store for this queue
+ }
+}
+
+void Queue::bound(const string& exchange, const string& key,
+ const FieldTable& args)
+{
+ bindings.add(exchange, key, args);
+}
+
+void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
+{
+ bindings.unbind(exchanges, shared_ref);
+}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+ policy = _policy;
+}
+
+const QueuePolicy* Queue::getPolicy()
+{
+ return policy.get();
+}
+
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
+}
+
+void Queue::setPersistenceId(uint64_t _persistenceId) const
+{
+ if (mgmtObject != 0 && persistenceId == 0)
+ {
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId);
+
+ if (externalQueueStore) {
+ ManagementObject* childObj = externalQueueStore->GetManagementObject();
+ if (childObj != 0)
+ childObj->setReference(mgmtObject->getObjectId());
+ }
+ }
+ persistenceId = _persistenceId;
+}
+
+void Queue::encode(Buffer& buffer) const
+{
+ buffer.putShortString(name);
+ buffer.put(settings);
+ if (policy.get()) {
+ buffer.put(*policy);
+ }
+}
+
+uint32_t Queue::encodedSize() const
+{
+ return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ + (policy.get() ? (*policy).encodedSize() : 0);
+}
+
+Queue::shared_ptr Queue::decode(QueueRegistry& queues, Buffer& buffer)
+{
+ string name;
+ buffer.getShortString(name);
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ buffer.get(result.first->settings);
+ result.first->configure(result.first->settings);
+ if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) {
+ buffer.get ( *(result.first->policy) );
+ }
+ return result.first;
+}
+
+
+void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
+{
+ alternateExchange = exchange;
+}
+
+boost::shared_ptr<Exchange> Queue::getAlternateExchange()
+{
+ return alternateExchange;
+}
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ queue->unbind(broker.getExchanges(), queue);
+ queue->destroy();
+ }
+}
+
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return o == owner;
+}
+
+void Queue::releaseExclusiveOwnership()
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ owner = 0;
+}
+
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ if (owner) {
+ return false;
+ } else {
+ owner = o;
+ return true;
+ }
+}
+
+bool Queue::hasExclusiveOwner() const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return owner != 0;
+}
+
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
+}
+
+void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
+ if (externalQueueStore!=inst && externalQueueStore)
+ delete externalQueueStore;
+ externalQueueStore = inst;
+
+ if (inst) {
+ ManagementObject* childObj = inst->GetManagementObject();
+ if (childObj != 0 && mgmtObject != 0)
+ childObj->setReference(mgmtObject->getObjectId());
+ }
+}
+
+bool Queue::releaseMessageContent(const QueuedMessage& m)
+{
+ if (store && !NullMessageStore::isNullStore(store)) {
+ QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
+ m.payload->releaseContent(store);
+ return true;
+ } else {
+ QPID_LOG(warning, "Message " << m.position << " on " << name
+ << " cannot be released from memory as the queue is not durable");
+ return false;
+ }
+}
+
+ManagementObject* Queue::GetManagementObject (void) const
+{
+ return (ManagementObject*) mgmtObject;
+}
+
+Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case _qmf::Queue::METHOD_PURGE :
+ _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args;
+ purge (iargs.i_request);
+ status = Manageable::STATUS_OK;
+ break;
+ }
+
+ return status;
+}
+
+void Queue::setPosition(SequenceNumber n) {
+ Mutex::ScopedLock locker(messageLock);
+ sequence = n;
+}