diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/SemanticState.h | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 217 |
1 files changed, 115 insertions, 102 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 9add663e24..ec48ca4753 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -46,10 +46,12 @@ #include <list> #include <map> +#include <set> #include <vector> #include <boost/enable_shared_from_this.hpp> #include <boost/cast.hpp> +#include <boost/tuple/tuple.hpp> namespace qpid { namespace broker { @@ -57,6 +59,7 @@ namespace broker { class Exchange; //class MessageStore; class AsyncStore; +class ProtocolRegistry; class SessionContext; class SessionState; @@ -74,104 +77,18 @@ class SessionState; * called when a client's socket is ready to write data. * */ +class SemanticStateConsumerImpl; class SemanticState : private boost::noncopyable { - public: - class ConsumerImpl : public Consumer, public sys::OutputTask, - public boost::enable_shared_from_this<ConsumerImpl>, - public management::Manageable - { - protected: - mutable qpid::sys::Mutex lock; - SemanticState* const parent; - private: - const boost::shared_ptr<Queue> queue; - const bool ackExpected; - const bool acquire; - bool blocked; - bool exclusive; - std::string resumeId; - const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command - uint64_t resumeTtl; - framing::FieldTable arguments; - Credit credit; - bool notifyEnabled; - const int syncFrequency; - int deliveryCount; - qmf::org::apache::qpid::broker::Subscription* mgmtObject; - - bool checkCredit(const Message& msg); - void allocateCredit(const Message& msg); - bool haveCredit(); - - protected: - QPID_BROKER_EXTERN virtual bool doDispatch(); - size_t unacked() { return parent->unacked.size(); } - QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>); - - public: - typedef boost::shared_ptr<ConsumerImpl> shared_ptr; - - QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, - bool ack, SubscriptionType type, bool exclusive, - const std::string& tag, const std::string& resumeId, - uint64_t resumeTtl, const framing::FieldTable& arguments); - QPID_BROKER_EXTERN ~ConsumerImpl(); - QPID_BROKER_EXTERN OwnershipToken* getSession(); - QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&); - QPID_BROKER_EXTERN bool filter(const Message&); - QPID_BROKER_EXTERN bool accept(const Message&); - QPID_BROKER_EXTERN void cancel() {} - - QPID_BROKER_EXTERN void disableNotify(); - QPID_BROKER_EXTERN void enableNotify(); - QPID_BROKER_EXTERN void notify(); - QPID_BROKER_EXTERN bool isNotifyEnabled() const; - - QPID_BROKER_EXTERN void requestDispatch(); - - QPID_BROKER_EXTERN void setWindowMode(); - QPID_BROKER_EXTERN void setCreditMode(); - QPID_BROKER_EXTERN void addByteCredit(uint32_t value); - QPID_BROKER_EXTERN void addMessageCredit(uint32_t value); - QPID_BROKER_EXTERN void flush(); - QPID_BROKER_EXTERN void stop(); - QPID_BROKER_EXTERN void complete(DeliveryRecord&); - boost::shared_ptr<Queue> getQueue() const { return queue; } - bool isBlocked() const { return blocked; } - bool setBlocked(bool set) { std::swap(set, blocked); return set; } - - QPID_BROKER_EXTERN bool doOutput(); - - Credit& getCredit() { return credit; } - const Credit& getCredit() const { return credit; } - bool isAckExpected() const { return ackExpected; } - bool isAcquire() const { return acquire; } - bool isExclusive() const { return exclusive; } - std::string getResumeId() const { return resumeId; }; - const std::string& getTag() const { return tag; } - uint64_t getResumeTtl() const { return resumeTtl; } - uint32_t getDeliveryCount() const { return deliveryCount; } - void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } - const framing::FieldTable& getArguments() const { return arguments; } - - SemanticState& getParent() { return *parent; } - const SemanticState& getParent() const { return *parent; } - - void acknowledged(const DeliveryRecord&) {} - - // manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* - GetManagementObject(void) const; - - QPID_BROKER_EXTERN management::Manageable::status_t - ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); - }; + friend class SemanticStateConsumerImpl; + public: + typedef SemanticStateConsumerImpl ConsumerImpl; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; private: - typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; + typedef std::map<std::string, boost::shared_ptr<ConsumerImpl> > ConsumerImplMap; + typedef boost::tuple<std::string, std::string, std::string, std::string> Binding; + typedef std::set<Binding> Bindings; SessionState& session; ConsumerImplMap consumers; @@ -189,13 +106,16 @@ class SemanticState : private boost::noncopyable { //needed for queue delete events in auto-delete: const std::string connectionId; + Bindings bindings; + void checkDtxTimeout(); bool complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); - void cancel(ConsumerImpl::shared_ptr); - void disable(ConsumerImpl::shared_ptr); + void cancel(boost::shared_ptr<ConsumerImpl>); + void disable(boost::shared_ptr<ConsumerImpl>); + void unbindSessionBindings(); public: @@ -205,8 +125,8 @@ class SemanticState : private boost::noncopyable { SessionContext& getSession(); const SessionContext& getSession() const; - const ConsumerImpl::shared_ptr find(const std::string& destination) const; - bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; + const boost::shared_ptr<ConsumerImpl> find(const std::string& destination) const; + bool find(const std::string& destination, boost::shared_ptr<ConsumerImpl>&) const; /** * Get named queue, never returns 0. @@ -257,11 +177,6 @@ class SemanticState : private boost::noncopyable { void detached(); void closed(); - // Used by cluster to re-create sessions - template <class F> void eachConsumer(F f) { - for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i) - f(i->second); - } DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } @@ -271,6 +186,104 @@ class SemanticState : private boost::noncopyable { void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); DtxBufferMap& getSuspendedXids() { return suspendedXids; } + + void addBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const framing::FieldTable& arguments); + void removeBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey); +}; + +class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask, + public boost::enable_shared_from_this<SemanticStateConsumerImpl>, + public management::Manageable +{ + protected: + mutable qpid::sys::Mutex lock; + SemanticState* const parent; + private: + const boost::shared_ptr<Queue> queue; + const bool ackExpected; + const bool acquire; + bool blocked; + bool exclusive; + std::string resumeId; + const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command + uint64_t resumeTtl; + framing::FieldTable arguments; + Credit credit; + bool notifyEnabled; + const int syncFrequency; + int deliveryCount; + qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; + ProtocolRegistry& protocols; + + bool checkCredit(const Message& msg); + void allocateCredit(const Message& msg); + bool haveCredit(); + + protected: + QPID_BROKER_EXTERN virtual bool doDispatch(); + size_t unacked() { return parent->unacked.size(); } + QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>); + + public: + typedef boost::shared_ptr<SemanticStateConsumerImpl> shared_ptr; + + QPID_BROKER_EXTERN SemanticStateConsumerImpl(SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, SubscriptionType type, bool exclusive, + const std::string& tag, const std::string& resumeId, + uint64_t resumeTtl, const framing::FieldTable& arguments); + QPID_BROKER_EXTERN ~SemanticStateConsumerImpl(); + QPID_BROKER_EXTERN OwnershipToken* getSession(); + QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&); + QPID_BROKER_EXTERN bool filter(const Message&); + QPID_BROKER_EXTERN bool accept(const Message&); + QPID_BROKER_EXTERN void cancel() {} + + QPID_BROKER_EXTERN void disableNotify(); + QPID_BROKER_EXTERN void enableNotify(); + QPID_BROKER_EXTERN void notify(); + QPID_BROKER_EXTERN bool isNotifyEnabled() const; + + QPID_BROKER_EXTERN void requestDispatch(); + + QPID_BROKER_EXTERN void setWindowMode(); + QPID_BROKER_EXTERN void setCreditMode(); + QPID_BROKER_EXTERN void addByteCredit(uint32_t value); + QPID_BROKER_EXTERN void addMessageCredit(uint32_t value); + QPID_BROKER_EXTERN void flush(); + QPID_BROKER_EXTERN void stop(); + QPID_BROKER_EXTERN void complete(DeliveryRecord&); + boost::shared_ptr<Queue> getQueue() const { return queue; } + bool isBlocked() const { return blocked; } + bool setBlocked(bool set) { std::swap(set, blocked); return set; } + + QPID_BROKER_EXTERN bool doOutput(); + + Credit& getCredit() { return credit; } + const Credit& getCredit() const { return credit; } + bool isAckExpected() const { return ackExpected; } + bool isAcquire() const { return acquire; } + bool isExclusive() const { return exclusive; } + std::string getResumeId() const { return resumeId; }; + const std::string& getTag() const { return tag; } + uint64_t getResumeTtl() const { return resumeTtl; } + uint32_t getDeliveryCount() const { return deliveryCount; } + void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } + const framing::FieldTable& getArguments() const { return arguments; } + + SemanticState& getParent() { return *parent; } + const SemanticState& getParent() const { return *parent; } + + void acknowledged(const DeliveryRecord&) {} + + // manageable entry points + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr + GetManagementObject(void) const; + + QPID_BROKER_EXTERN management::Manageable::status_t + ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); }; }} // namespace qpid::broker |