summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r--cpp/src/qpid/broker/SemanticState.h24
1 files changed, 10 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index a69962c083..0f2e08cb3c 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -55,9 +55,7 @@ class SessionContext;
* SemanticState holds the L3 and L4 state of an open session, whether
* attached to a channel or suspended.
*/
-class SemanticState : public sys::OutputTask,
- private boost::noncopyable
-{
+class SemanticState : private boost::noncopyable {
public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
@@ -77,9 +75,6 @@ class SemanticState : public sys::OutputTask,
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
- // queueHasMessages is boolean but valgrind has trouble with
- // AtomicValue<bool> so use an int with 1 or 0.
- sys:: AtomicValue<int> queueHasMessages;
const int syncFrequency;
int deliveryCount;
@@ -105,6 +100,8 @@ class SemanticState : public sys::OutputTask,
void notify();
bool isNotifyEnabled() const;
+ void requestDispatch();
+
void setWindowMode();
void setCreditMode();
void addByteCredit(uint32_t value);
@@ -130,6 +127,8 @@ class SemanticState : public sys::OutputTask,
std::string getResumeId() const { return resumeId; };
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
+
+ SemanticState& getParent() { return *parent; }
};
private:
@@ -147,7 +146,6 @@ class SemanticState : public sys::OutputTask,
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
- sys::AggregateOutput outputTasks;
AclModule* acl;
const bool authMsg;
const string userID;
@@ -158,7 +156,6 @@ class SemanticState : public sys::OutputTask,
bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
- void requestDispatch(ConsumerImpl&);
void cancel(ConsumerImpl::shared_ptr);
public:
@@ -208,8 +205,6 @@ class SemanticState : public sys::OutputTask,
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
- bool hasOutput() { return outputTasks.hasOutput(); }
- bool doOutput() { return outputTasks.doOutput(); }
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
@@ -218,10 +213,11 @@ class SemanticState : public sys::OutputTask,
void attached();
void detached();
- // Used by cluster to re-create replica sessions
- static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
-
- template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+ // Used by cluster to re-create sessions
+ template <class F> void eachConsumer(F f) {
+ for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i)
+ f(i->second);
+ }
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }