summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp75
1 files changed, 53 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 4d5c4e7537..3185080f94 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -72,7 +72,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
SemanticState::~SemanticState() {
//cancel all consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- cancel(*ptr_map_ptr(i));
+ cancel(i->second);
}
if (dtxBuffer.get()) {
@@ -91,16 +91,16 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
- queue->consume(*c, exclusive);//may throw exception
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
+ queue->consume(c, exclusive);//may throw exception
outputTasks.addOutputTask(c.get());
- consumers.insert(tagInOut, c.release());
+ consumers[tagInOut] = c;
}
void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
- cancel(*ptr_map_ptr(i));
+ cancel(i->second);
consumers.erase(i);
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
@@ -260,7 +260,8 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
blocked(true),
windowing(true),
msgCredit(0),
- byteCredit(0){}
+ byteCredit(0),
+ notifyEnabled(true) {}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -324,10 +325,11 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
SemanticState::ConsumerImpl::~ConsumerImpl() {}
-void SemanticState::cancel(ConsumerImpl& c)
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
- outputTasks.removeOutputTask(&c);
- Queue::shared_ptr queue = c.getQueue();
+ c->disableNotify();
+ outputTasks.removeOutputTask(c.get());
+ Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
@@ -358,10 +360,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
- if (acl && acl->doTransferAcl())
- {
- if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
- throw NotAllowedException("ACL denied exhange publish request");
+ if (acl && acl->doTransferAcl())
+ {
+ if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() ))
+ throw NotAllowedException("ACL denied exhange publish request");
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -382,7 +384,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
- requestDispatch(*ptr_map_ptr(i));
+ requestDispatch(*(i->second));
}
}
@@ -402,7 +404,7 @@ void SemanticState::complete(DeliveryRecord& delivery)
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- ptr_map_ptr(i)->complete(delivery);
+ i->second->complete(delivery);
}
}
@@ -460,7 +462,7 @@ SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
if (i == consumers.end()) {
throw NotFoundException(QPID_MSG("Unknown destination " << destination));
} else {
- return *ptr_map_ptr(i);
+ return *(i->second);
}
}
@@ -526,7 +528,7 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
void SemanticState::ConsumerImpl::flush()
{
- while(queue->dispatch(*this))
+ while(queue->dispatch(shared_from_this()))
;
stop();
}
@@ -591,19 +593,34 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
}
bool SemanticState::ConsumerImpl::hasOutput() {
- return queue->checkForMessages(*this);
+ return queue->checkForMessages(shared_from_this());
}
bool SemanticState::ConsumerImpl::doOutput()
{
- //TODO: think through properly
- return queue->dispatch(*this);
+ return queue->dispatch(shared_from_this());
+}
+
+void SemanticState::ConsumerImpl::enableNotify()
+{
+ Mutex::ScopedLock l(lock);
+ notifyEnabled = true;
+}
+
+void SemanticState::ConsumerImpl::disableNotify()
+{
+ Mutex::ScopedLock l(lock);
+ notifyEnabled = true;
}
void SemanticState::ConsumerImpl::notify()
{
- //TODO: think through properly
- parent->outputTasks.activateOutput();
+ //TODO: alter this, don't want to hold locks across external
+ //calls; for now its is required to protect the notify() from
+ //having part of the object chain of the invocation being
+ //concurrently deleted
+ Mutex::ScopedLock l(lock);
+ if (notifyEnabled) parent->outputTasks.activateOutput();
}
@@ -644,4 +661,18 @@ void SemanticState::completed(DeliveryId first, DeliveryId last)
requestDispatch();
}
+void SemanticState::attached()
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ i->second->enableNotify();
+ }
+}
+
+void SemanticState::detached()
+{
+ for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
+ i->second->disableNotify();
+ }
+}
+
}} // namespace qpid::broker