diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Queue.h | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 47 |
1 files changed, 32 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 1294f813aa..bb713eba2b 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -38,7 +38,6 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -56,6 +55,9 @@ #include <algorithm> namespace qpid { +namespace sys { +class TimerTask; +} namespace broker { class Broker; class Exchange; @@ -83,6 +85,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, { Queue& parent; uint count; + qpid::sys::Monitor usageLock; UsageBarrier(Queue&); bool acquire(); @@ -142,18 +145,18 @@ class Queue : public boost::enable_shared_from_this<Queue>, * o consumerCount (TBD: move under separate lock) * o Queue::UsageBarrier (TBD: move under separate lock) */ - mutable qpid::sys::Monitor messageLock; + mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; mutable uint64_t persistenceId; - const QueueSettings settings; + QueueSettings settings; qpid::framing::FieldTable encodableSettings; QueueDepth current; QueueBindings bindings; std::string alternateExchangeName; boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; - qmf::org::apache::qpid::broker::Queue* mgmtObject; - qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; + qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject; + qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; @@ -189,7 +192,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, int getEventMode(); void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>); void abandoned(const Message& message); - void checkNotDeleted(const Consumer::shared_ptr&); + bool checkNotDeleted(const Consumer::shared_ptr&); void notifyDeleted(); uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType); virtual bool checkDepth(const QueueDepth& increment, const Message&); @@ -338,7 +341,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, * exclusive owner */ static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); - static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId); + QPID_BROKER_EXTERN static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId); virtual void setExternalQueueStore(ExternalQueueStore* inst); @@ -352,7 +355,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; @@ -382,15 +385,30 @@ class Queue : public boost::enable_shared_from_this<Queue>, * * The _caller_ must ensure that any messages after pos have been dequeued. * - * Used by HA/cluster code for queue replication. + * Used by HA code for queue replication. */ QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos); /** *@return sequence number for the back of the queue. The next message pushed - * will be at getPosition+1 + * will be at getPosition()+1 */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); + + /** + * Set front and back. + * If the queue is empty then front = back+1 (the first message to + * consume will be the next message pushed.) + * + *@param front = Position of first message to consume. + *@param back = getPosition(), next message pushed will be getPosition()+1 + *@param type Subscription type to use to determine the front. + */ + QPID_BROKER_EXTERN void getRange( + framing::SequenceNumber& front, framing::SequenceNumber& back, + SubscriptionType type=CONSUMER + ); + QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); @@ -399,11 +417,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, */ QPID_BROKER_EXTERN void recoveryComplete(ExchangeRegistry& exchanges); - // For cluster update - QPID_BROKER_EXTERN QueueListeners& getListeners(); - QPID_BROKER_EXTERN Messages& getMessages(); - QPID_BROKER_EXTERN const Messages& getMessages() const; - /** * Reserve space in policy for an enqueued message that * has been recovered in the prepared state (dtx only) @@ -420,6 +433,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value); + + /** Add an argument to be included in management messages about this queue. */ + QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant& value); + friend class QueueFactory; }; } |