diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Queue.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 145 |
1 files changed, 78 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index f595b81724..bab6f2ea55 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -44,9 +44,9 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" -#include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" @@ -79,8 +79,8 @@ namespace { inline void mgntEnqStats(const Message& msg, - _qmf::Queue* mgmtObject, - _qmf::Broker* brokerMgmtObject) + _qmf::Queue::shared_ptr mgmtObject, + _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0) { _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); @@ -103,8 +103,8 @@ inline void mgntEnqStats(const Message& msg, } inline void mgntDeqStats(const Message& msg, - _qmf::Queue* mgmtObject, - _qmf::Broker* brokerMgmtObject) + _qmf::Queue::shared_ptr mgmtObject, + _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); @@ -166,13 +166,11 @@ void Queue::TxPublish::rollback() throw() } Queue::Queue(const string& _name, const QueueSettings& _settings, -// MessageStore* const _store, AsyncStore* const _asyncStore, Manageable* parent, Broker* b) : name(_name), -// store(_store), asyncStore(_asyncStore), owner(0), consumerCount(0), @@ -183,8 +181,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, messages(new MessageDeque()), persistenceId(0), settings(b ? merge(_settings, b->getOptions()) : _settings), - mgmtObject(0), - brokerMgmtObject(0), eventMode(0), broker(b), deleted(false), @@ -199,27 +195,24 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); - if (agent != 0) { -// mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete); + mgmtObject = _qmf::Queue::shared_ptr( + new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete)); mgmtObject->set_arguments(settings.asMap()); -// agent->addObject(mgmtObject, 0, store != 0); agent->addObject(mgmtObject, 0, asyncStore != 0); - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); } } + + if ( settings.isBrowseOnly ) { + QPID_LOG ( info, "Queue " << name << " is browse-only." ); + } } Queue::~Queue() { - if (mgmtObject != 0) { - mgmtObject->resourceDestroy(); - if (brokerMgmtObject) - brokerMgmtObject->dec_queueCount(); - } } bool isLocalTo(const OwnershipToken* token, const Message& msg) @@ -246,9 +239,6 @@ void Queue::deliver(Message msg, TxBuffer* txn){ //'link' for whatever protocol is used; that would let protocol //specific stuff be kept out the queue - // Check for deferred delivery in a cluster. - if (broker && broker->deferDelivery(name, msg)) - return; if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg, 0); @@ -307,7 +297,6 @@ void Queue::process(Message& msg) void Queue::release(const QueueCursor& position, bool markRedelivered) { - assertClusterSafe(); QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); @@ -333,7 +322,6 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position) boost::intrusive_ptr<PersistableMessage> pmsg; { Mutex::ScopedLock locker(messageLock); - assertClusterSafe(); QPID_LOG(debug, "Attempting to dequeue message at " << position); QueueCursor cursor; Message* msg = messages->find(position, &cursor); @@ -353,7 +341,6 @@ bool Queue::dequeueMessageAt(const SequenceNumber& position) bool Queue::acquire(const QueueCursor& position, const std::string& consumer) { Mutex::ScopedLock locker(messageLock); - assertClusterSafe(); Message* msg; msg = messages->find(position); @@ -375,12 +362,13 @@ bool Queue::acquire(const QueueCursor& position, const std::string& consumer) bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { - checkNotDeleted(c); + if (!checkNotDeleted(c)) return false; QueueListeners::NotificationSet set; while (true) { //TODO: reduce lock scope Mutex::ScopedLock locker(messageLock); - Message* msg = messages->next(*c); + QueueCursor cursor = c->getCursor(); // Save current position. + Message* msg = messages->next(*c); // Advances c. if (msg) { if (msg->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); @@ -419,6 +407,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) } else { //message(s) are available but consumer hasn't got enough credit QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + c->setCursor(cursor); // Restore cursor, will try again with credit if (c->preAcquires()) { //let someone else try listeners.populate(set); @@ -480,7 +469,6 @@ bool Queue::find(SequenceNumber pos, Message& msg) const void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) { - assertClusterSafe(); { Mutex::ScopedLock locker(messageLock); // NOTE: consumerCount is actually a count of all @@ -488,6 +476,11 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) // Check for exclusivity of acquiring consumers. size_t acquiringConsumers = consumerCount - browserCount; if (c->preAcquires()) { + if(settings.isBrowseOnly) { + throw NotAllowedException( + QPID_MSG("Queue " << name << " is browse only. Refusing acquiring consumer.")); + } + if(exclusive) { throw ResourceLockedException( QPID_MSG("Queue " << getName() @@ -502,22 +495,29 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) } } } - else + else if(c->isCounted()) { browserCount++; - consumerCount++; - //reset auto deletion timer if necessary - if (settings.autoDeleteDelay && autoDeleteTask) { - autoDeleteTask->cancel(); } - observeConsumerAdd(*c, locker); + if(c->isCounted()) { + consumerCount++; + + //reset auto deletion timer if necessary + if (settings.autoDeleteDelay && autoDeleteTask) { + autoDeleteTask->cancel(); + } + + observeConsumerAdd(*c, locker); + } + } + if (mgmtObject != 0 && c->isCounted()) { + mgmtObject->inc_consumerCount(); } - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); } void Queue::cancel(Consumer::shared_ptr c) { removeListener(c); + if(c->isCounted()) { Mutex::ScopedLock locker(messageLock); consumerCount--; @@ -525,8 +525,9 @@ void Queue::cancel(Consumer::shared_ptr c) if(exclusive) exclusive = 0; observeConsumerRemove(*c, locker); } - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + if (mgmtObject != 0 && c->isCounted()) { + mgmtObject->dec_consumerCount(); + } } /** @@ -733,7 +734,6 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, void Queue::push(Message& message, bool /*isRecovery*/) { - assertClusterSafe(); QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); @@ -1101,8 +1101,16 @@ void Queue::destroyed() notifyDeleted(); { Mutex::ScopedLock lock(messageLock); + for_each(observers.begin(), observers.end(), + boost::bind(&QueueObserver::destroy, _1)); observers.clear(); } + + if (mgmtObject != 0) { + mgmtObject->resourceDestroy(); + if (brokerMgmtObject) + brokerMgmtObject->dec_queueCount(); + } } void Queue::notifyDeleted() @@ -1136,7 +1144,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) { - ManagementObject* childObj = externalQueueStore->GetManagementObject(); + ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1180,6 +1188,7 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) { alternateExchange = exchange; + alternateExchange->incAlternateUsers(); if (mgmtObject) { if (exchange.get() != 0) mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId()); @@ -1197,14 +1206,10 @@ void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::strin { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { - QPID_LOG(debug, "Auto-deleting " << queue->getName()); - queue->destroyed(); - - if (broker.getManagementAgent()) - broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName())); - QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName() + QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName() << " user:" << userId << " rhost:" << connectionId ); + queue->destroyed(); } } @@ -1233,7 +1238,7 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::st if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) { AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC)); queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time)); - broker.getClusterTimer().add(queue->autoDeleteTask); + broker.getTimer().add(queue->autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); } else { tryAutoDeleteImpl(broker, queue, connectionId, userId); @@ -1290,7 +1295,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { externalQueueStore = inst; if (inst) { - ManagementObject* childObj = inst->GetManagementObject(); + ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1378,9 +1383,9 @@ void Queue::countLoadedFromDisk(uint64_t size) const } -ManagementObject* Queue::GetManagementObject (void) const +ManagementObject::shared_ptr Queue::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) @@ -1459,6 +1464,16 @@ SequenceNumber Queue::getPosition() { return sequence; } +void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back, + SubscriptionType type) +{ + Mutex::ScopedLock locker(messageLock); + QueueCursor cursor(type); + back = sequence; + Message* message = messages->next(cursor); + front = message ? message->getSequence() : back+1; +} + int Queue::getEventMode() { return eventMode; } void Queue::recoveryComplete(ExchangeRegistry& exchanges) @@ -1493,20 +1508,11 @@ void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&) mgntEnqStats(m, mgmtObject, brokerMgmtObject); } -// Note: accessing listeners outside of lock is dangerous. Caller must ensure the queue's -// state is not changed while listeners is referenced. -QueueListeners& Queue::getListeners() { return listeners; } - -// Note: accessing messages outside of lock is dangerous. Caller must ensure the queue's -// state is not changed while messages is referenced. -Messages& Queue::getMessages() { return *messages; } -const Messages& Queue::getMessages() const { return *messages; } - -void Queue::checkNotDeleted(const Consumer::shared_ptr& c) +bool Queue::checkNotDeleted(const Consumer::shared_ptr& c) { - if (deleted && !c->hideDeletedError()) { + if (deleted && !c->hideDeletedError()) throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted.")); - } + return !deleted; } void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) @@ -1641,7 +1647,7 @@ Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() { - Monitor::ScopedLock l(parent.messageLock); /** @todo: use a dedicated lock instead of messageLock */ + Monitor::ScopedLock l(usageLock); if (parent.deleted) { return false; } else { @@ -1652,15 +1658,20 @@ bool Queue::UsageBarrier::acquire() void Queue::UsageBarrier::release() { - Monitor::ScopedLock l(parent.messageLock); - if (--count == 0) parent.messageLock.notifyAll(); + Monitor::ScopedLock l(usageLock); + if (--count == 0) usageLock.notifyAll(); } void Queue::UsageBarrier::destroy() { - Monitor::ScopedLock l(parent.messageLock); + Monitor::ScopedLock l(usageLock); parent.deleted = true; - while (count) parent.messageLock.wait(); + while (count) usageLock.wait(); +} + +void Queue::addArgument(const string& key, const types::Variant& value) { + settings.original.insert(types::Variant::Map::value_type(key, value)); + if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap()); } }} |