summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.h
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-09-09 17:15:17 +0000
committerGordon Sim <gsim@apache.org>2008-09-09 17:15:17 +0000
commit62dbd3afff76a6da41cd9e1aee8ce11518f22fca (patch)
treec15b11bcd62cdb011ff0826dbf2c5e23530fd255 /cpp/src/qpid/broker/SemanticState.h
parent833cf68a5bf58e882f377d144768ceb546e5e036 (diff)
downloadqpid-python-62dbd3afff76a6da41cd9e1aee8ce11518f22fca.tar.gz
QPID-1261: initial fix (this degrades performance for shared queues with more than one consumer; I'll work on fixing that asap). This also moves the lock refered to in QQPID-1265 which I will update accordingly.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@693518 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.h')
-rw-r--r--cpp/src/qpid/broker/SemanticState.h18
1 files changed, 15 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index e03d5ec89b..1d32d8aa50 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -37,6 +37,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/shared_ptr.h"
#include "AclModule.h"
@@ -58,8 +59,10 @@ class SessionContext;
class SemanticState : public sys::OutputTask,
private boost::noncopyable
{
- class ConsumerImpl : public Consumer, public sys::OutputTask
+ class ConsumerImpl : public Consumer, public sys::OutputTask,
+ public boost::enable_shared_from_this<ConsumerImpl>
{
+ qpid::sys::Mutex lock;
SemanticState* const parent;
const DeliveryToken::shared_ptr token;
const string name;
@@ -71,11 +74,14 @@ class SemanticState : public sys::OutputTask,
bool windowing;
uint32_t msgCredit;
uint32_t byteCredit;
+ bool notifyEnabled;
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
public:
+ typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
+
ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
@@ -84,6 +90,9 @@ class SemanticState : public sys::OutputTask,
bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
+
+ void disableNotify();
+ void enableNotify();
void notify();
void setWindowMode();
@@ -100,7 +109,7 @@ class SemanticState : public sys::OutputTask,
bool doOutput();
};
- typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
+ typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
@@ -130,7 +139,7 @@ class SemanticState : public sys::OutputTask,
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
- void cancel(ConsumerImpl&);
+ void cancel(ConsumerImpl::shared_ptr);
public:
SemanticState(DeliveryAdapter&, SessionContext&);
@@ -187,6 +196,9 @@ class SemanticState : public sys::OutputTask,
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+
+ void attached();
+ void detached();
};
}} // namespace qpid::broker