diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 75 |
1 files changed, 53 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 4d5c4e7537..3185080f94 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -72,7 +72,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) SemanticState::~SemanticState() { //cancel all consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(*ptr_map_ptr(i)); + cancel(i->second); } if (dtxBuffer.get()) { @@ -91,16 +91,16 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire)); - queue->consume(*c, exclusive);//may throw exception + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire)); + queue->consume(c, exclusive);//may throw exception outputTasks.addOutputTask(c.get()); - consumers.insert(tagInOut, c.release()); + consumers[tagInOut] = c; } void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { - cancel(*ptr_map_ptr(i)); + cancel(i->second); consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery @@ -260,7 +260,8 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, blocked(true), windowing(true), msgCredit(0), - byteCredit(0){} + byteCredit(0), + notifyEnabled(true) {} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -324,10 +325,11 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) SemanticState::ConsumerImpl::~ConsumerImpl() {} -void SemanticState::cancel(ConsumerImpl& c) +void SemanticState::cancel(ConsumerImpl::shared_ptr c) { - outputTasks.removeOutputTask(&c); - Queue::shared_ptr queue = c.getQueue(); + c->disableNotify(); + outputTasks.removeOutputTask(c.get()); + Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { @@ -358,10 +360,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { cacheExchange = session.getBroker().getExchanges().get(exchangeName); } - if (acl && acl->doTransferAcl()) - { - if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() )) - throw NotAllowedException("ACL denied exhange publish request"); + if (acl && acl->doTransferAcl()) + { + if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() )) + throw NotAllowedException("ACL denied exhange publish request"); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -382,7 +384,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { void SemanticState::requestDispatch() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(*ptr_map_ptr(i)); + requestDispatch(*(i->second)); } } @@ -402,7 +404,7 @@ void SemanticState::complete(DeliveryRecord& delivery) delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - ptr_map_ptr(i)->complete(delivery); + i->second->complete(delivery); } } @@ -460,7 +462,7 @@ SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) if (i == consumers.end()) { throw NotFoundException(QPID_MSG("Unknown destination " << destination)); } else { - return *ptr_map_ptr(i); + return *(i->second); } } @@ -526,7 +528,7 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) void SemanticState::ConsumerImpl::flush() { - while(queue->dispatch(*this)) + while(queue->dispatch(shared_from_this())) ; stop(); } @@ -591,19 +593,34 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) } bool SemanticState::ConsumerImpl::hasOutput() { - return queue->checkForMessages(*this); + return queue->checkForMessages(shared_from_this()); } bool SemanticState::ConsumerImpl::doOutput() { - //TODO: think through properly - return queue->dispatch(*this); + return queue->dispatch(shared_from_this()); +} + +void SemanticState::ConsumerImpl::enableNotify() +{ + Mutex::ScopedLock l(lock); + notifyEnabled = true; +} + +void SemanticState::ConsumerImpl::disableNotify() +{ + Mutex::ScopedLock l(lock); + notifyEnabled = true; } void SemanticState::ConsumerImpl::notify() { - //TODO: think through properly - parent->outputTasks.activateOutput(); + //TODO: alter this, don't want to hold locks across external + //calls; for now its is required to protect the notify() from + //having part of the object chain of the invocation being + //concurrently deleted + Mutex::ScopedLock l(lock); + if (notifyEnabled) parent->outputTasks.activateOutput(); } @@ -644,4 +661,18 @@ void SemanticState::completed(DeliveryId first, DeliveryId last) requestDispatch(); } +void SemanticState::attached() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + i->second->enableNotify(); + } +} + +void SemanticState::detached() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + i->second->disableNotify(); + } +} + }} // namespace qpid::broker |