From 62dbd3afff76a6da41cd9e1aee8ce11518f22fca Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 9 Sep 2008 17:15:17 +0000 Subject: QPID-1261: initial fix (this degrades performance for shared queues with more than one consumer; I'll work on fixing that asap). This also moves the lock refered to in QQPID-1265 which I will update accordingly. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693518 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SemanticState.h | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'cpp/src/qpid/broker/SemanticState.h') diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index e03d5ec89b..1d32d8aa50 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -37,6 +37,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/Mutex.h" #include "qpid/shared_ptr.h" #include "AclModule.h" @@ -58,8 +59,10 @@ class SessionContext; class SemanticState : public sys::OutputTask, private boost::noncopyable { - class ConsumerImpl : public Consumer, public sys::OutputTask + class ConsumerImpl : public Consumer, public sys::OutputTask, + public boost::enable_shared_from_this { + qpid::sys::Mutex lock; SemanticState* const parent; const DeliveryToken::shared_ptr token; const string name; @@ -71,11 +74,14 @@ class SemanticState : public sys::OutputTask, bool windowing; uint32_t msgCredit; uint32_t byteCredit; + bool notifyEnabled; bool checkCredit(boost::intrusive_ptr& msg); void allocateCredit(boost::intrusive_ptr& msg); public: + typedef boost::shared_ptr shared_ptr; + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); @@ -84,6 +90,9 @@ class SemanticState : public sys::OutputTask, bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr msg); bool accept(boost::intrusive_ptr msg); + + void disableNotify(); + void enableNotify(); void notify(); void setWindowMode(); @@ -100,7 +109,7 @@ class SemanticState : public sys::OutputTask, bool doOutput(); }; - typedef boost::ptr_map ConsumerImplMap; + typedef std::map ConsumerImplMap; typedef std::map DtxBufferMap; SessionContext& session; @@ -130,7 +139,7 @@ class SemanticState : public sys::OutputTask, AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); - void cancel(ConsumerImpl&); + void cancel(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionContext&); @@ -187,6 +196,9 @@ class SemanticState : public sys::OutputTask, //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); void accepted(DeliveryId deliveryTag, DeliveryId endTag); + + void attached(); + void detached(); }; }} // namespace qpid::broker -- cgit v1.2.1