diff options
| author | Gordon Sim <gsim@apache.org> | 2008-09-09 17:15:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-09-09 17:15:17 +0000 |
| commit | 62dbd3afff76a6da41cd9e1aee8ce11518f22fca (patch) | |
| tree | c15b11bcd62cdb011ff0826dbf2c5e23530fd255 /cpp/src/qpid/broker/SemanticState.h | |
| parent | 833cf68a5bf58e882f377d144768ceb546e5e036 (diff) | |
| download | qpid-python-62dbd3afff76a6da41cd9e1aee8ce11518f22fca.tar.gz | |
QPID-1261: initial fix (this degrades performance for shared queues with more than one consumer; I'll work on fixing that asap). This also moves the lock refered to in QQPID-1265 which I will update accordingly.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693518 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index e03d5ec89b..1d32d8aa50 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -37,6 +37,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/Mutex.h" #include "qpid/shared_ptr.h" #include "AclModule.h" @@ -58,8 +59,10 @@ class SessionContext; class SemanticState : public sys::OutputTask, private boost::noncopyable { - class ConsumerImpl : public Consumer, public sys::OutputTask + class ConsumerImpl : public Consumer, public sys::OutputTask, + public boost::enable_shared_from_this<ConsumerImpl> { + qpid::sys::Mutex lock; SemanticState* const parent; const DeliveryToken::shared_ptr token; const string name; @@ -71,11 +74,14 @@ class SemanticState : public sys::OutputTask, bool windowing; uint32_t msgCredit; uint32_t byteCredit; + bool notifyEnabled; bool checkCredit(boost::intrusive_ptr<Message>& msg); void allocateCredit(boost::intrusive_ptr<Message>& msg); public: + typedef boost::shared_ptr<ConsumerImpl> shared_ptr; + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); @@ -84,6 +90,9 @@ class SemanticState : public sys::OutputTask, bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr<Message> msg); bool accept(boost::intrusive_ptr<Message> msg); + + void disableNotify(); + void enableNotify(); void notify(); void setWindowMode(); @@ -100,7 +109,7 @@ class SemanticState : public sys::OutputTask, bool doOutput(); }; - typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; + typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; SessionContext& session; @@ -130,7 +139,7 @@ class SemanticState : public sys::OutputTask, AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); - void cancel(ConsumerImpl&); + void cancel(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionContext&); @@ -187,6 +196,9 @@ class SemanticState : public sys::OutputTask, //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); void accepted(DeliveryId deliveryTag, DeliveryId endTag); + + void attached(); + void detached(); }; }} // namespace qpid::broker |
