diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 24 |
1 files changed, 10 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index a69962c083..0f2e08cb3c 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -55,9 +55,7 @@ class SessionContext; * SemanticState holds the L3 and L4 state of an open session, whether * attached to a channel or suspended. */ -class SemanticState : public sys::OutputTask, - private boost::noncopyable -{ +class SemanticState : private boost::noncopyable { public: class ConsumerImpl : public Consumer, public sys::OutputTask, public boost::enable_shared_from_this<ConsumerImpl> @@ -77,9 +75,6 @@ class SemanticState : public sys::OutputTask, uint32_t msgCredit; uint32_t byteCredit; bool notifyEnabled; - // queueHasMessages is boolean but valgrind has trouble with - // AtomicValue<bool> so use an int with 1 or 0. - sys:: AtomicValue<int> queueHasMessages; const int syncFrequency; int deliveryCount; @@ -105,6 +100,8 @@ class SemanticState : public sys::OutputTask, void notify(); bool isNotifyEnabled() const; + void requestDispatch(); + void setWindowMode(); void setCreditMode(); void addByteCredit(uint32_t value); @@ -130,6 +127,8 @@ class SemanticState : public sys::OutputTask, std::string getResumeId() const { return resumeId; }; uint64_t getResumeTtl() const { return resumeTtl; } const framing::FieldTable& getArguments() const { return arguments; } + + SemanticState& getParent() { return *parent; } }; private: @@ -147,7 +146,6 @@ class SemanticState : public sys::OutputTask, DtxBufferMap suspendedXids; framing::SequenceSet accumulatedAck; boost::shared_ptr<Exchange> cacheExchange; - sys::AggregateOutput outputTasks; AclModule* acl; const bool authMsg; const string userID; @@ -158,7 +156,6 @@ class SemanticState : public sys::OutputTask, bool complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); - void requestDispatch(ConsumerImpl&); void cancel(ConsumerImpl::shared_ptr); public: @@ -208,8 +205,6 @@ class SemanticState : public sys::OutputTask, void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); void handle(boost::intrusive_ptr<Message> msg); - bool hasOutput() { return outputTasks.hasOutput(); } - bool doOutput() { return outputTasks.doOutput(); } //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); @@ -218,10 +213,11 @@ class SemanticState : public sys::OutputTask, void attached(); void detached(); - // Used by cluster to re-create replica sessions - static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); } - - template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); } + // Used by cluster to re-create sessions + template <class F> void eachConsumer(F f) { + for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i) + f(i->second); + } DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } |