diff options
author | Gordon Sim <gsim@apache.org> | 2010-06-22 19:27:12 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-06-22 19:27:12 +0000 |
commit | 2f81dc3c49491cdb166afce8440c1d4cf144152e (patch) | |
tree | 0c4238d97fbae94ed724aea299a9772829256334 /cpp/src | |
parent | dfcf3677e34ee4c1aaabe9c3d39bdbceef7ece9a (diff) | |
download | qpid-python-2f81dc3c49491cdb166afce8440c1d4cf144152e.tar.gz |
QPID-2688: ensure that unacked messages are requeued before autodeletion occurs when session closes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@956988 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 41 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 1 |
3 files changed, 37 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index b8981b4877..c91cfba2f8 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -73,21 +73,34 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), - isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())) + isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), + closeComplete(false) { acl = getSession().getBroker().getAcl(); } SemanticState::~SemanticState() { - //cancel all consumers - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(i->second); - } + closed(); +} - if (dtxBuffer.get()) { - dtxBuffer->fail(); +void SemanticState::closed() { + if (!closeComplete) { + //prevent requeued messages being redelivered to consumers + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + disable(i->second); + } + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); + + //now unsubscribe, which may trigger queue deletion and thus + //needs to occur after the requeueing of unacked messages + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + unsubscribe(i->second); + } + closeComplete = true; } - recover(true); } bool SemanticState::exists(const string& consumerTag){ @@ -389,11 +402,15 @@ SemanticState::ConsumerImpl::~ConsumerImpl() mgmtObject->resourceDestroy (); } -void SemanticState::cancel(ConsumerImpl::shared_ptr c) +void SemanticState::disable(ConsumerImpl::shared_ptr c) { c->disableNotify(); if (session.isAttached()) session.getConnection().outputTasks.removeOutputTask(c.get()); +} + +void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) +{ Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -403,6 +420,12 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) } } +void SemanticState::cancel(ConsumerImpl::shared_ptr c) +{ + disable(c); + unsubscribe(c); +} + void SemanticState::handle(intrusive_ptr<Message> msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index cae852732d..2b314920e6 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -157,6 +157,7 @@ class SemanticState : private boost::noncopyable { const string userID; const string userName; const bool isDefaultRealm; + bool closeComplete; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); @@ -165,6 +166,8 @@ class SemanticState : private boost::noncopyable { AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void cancel(ConsumerImpl::shared_ptr); + void unsubscribe(ConsumerImpl::shared_ptr); + void disable(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionContext&); @@ -220,6 +223,7 @@ class SemanticState : private boost::noncopyable { void attached(); void detached(); + void closed(); // Used by cluster to re-create sessions template <class F> void eachConsumer(F f) { diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index ddf68cad2f..be4f8c7b40 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -88,6 +88,7 @@ SessionState::SessionState( } SessionState::~SessionState() { + semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); |