diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-26 11:30:20 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-26 11:30:20 +0000 |
| commit | 692b5b5e56bfeb6d7bf96a7865e5c8b4bc86cc43 (patch) | |
| tree | 4f77d661b5ae5120c864733755363a8f5e2ec37c /qpid/cpp/src | |
| parent | 8e3f27a5b32c17e70ce75aea68343e19b243a05b (diff) | |
| download | qpid-python-692b5b5e56bfeb6d7bf96a7865e5c8b4bc86cc43.tar.gz | |
QPID-5098: better handling of link failures and close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1517498 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
9 files changed, 84 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index c9b7c1a1c4..6ca06cc649 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -147,14 +147,19 @@ bool ConnectionContext::isOpen() const void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - //wait for outstanding sends to settle - while (!ssn->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(ssn);//wait until message has been confirmed + if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { + //wait for outstanding sends to settle + while (!ssn->settled()) { + QPID_LOG(debug, "Waiting for sends to settle before closing"); + wait(ssn);//wait until message has been confirmed + } + } + + if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { + pn_session_close(ssn->session); } + sessions.erase(ssn->getName()); - pn_session_close(ssn->session); - //TODO: need to destroy session and remove context from map wakeupDriver(); } @@ -290,6 +295,31 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid: wakeupDriver(); } +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { + lnk->close(); + } + wakeupDriver(); + while (pn_link_state(lnk->sender) & PN_REMOTE_ACTIVE) { + wait(); + } + ssn->removeSender(lnk->getName()); +} + +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { + lnk->close(); + } + wakeupDriver(); + while (pn_link_state(lnk->receiver) & PN_REMOTE_ACTIVE) { + wait(); + } + ssn->removeReceiver(lnk->getName()); +} void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { @@ -521,13 +551,14 @@ boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transaction SessionMap::const_iterator i = sessions.find(name); if (i == sessions.end()) { boost::shared_ptr<SessionContext> s(new SessionContext(connection)); + s->setName(name); s->session = pn_session(connection); pn_session_open(s->session); - sessions[name] = s; wakeupDriver(); while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { wait(); } + sessions[name] = s; return s; } else { throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index d8a00ea147..35948b65cf 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -75,6 +75,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void endSession(boost::shared_ptr<SessionContext>); void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp index c601d05ed0..177f47896b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverHandle.cpp @@ -84,7 +84,7 @@ uint32_t ReceiverHandle::getUnsettled() void ReceiverHandle::close() { - session->closeReceiver(getName()); + connection->detach(session, receiver); } const std::string& ReceiverHandle::getName() const diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1043a0c62c..92a8941571 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -67,7 +67,7 @@ uint32_t SenderContext::getCapacity() uint32_t SenderContext::getUnsettled() { - return processUnsettled(); + return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/); } const std::string& SenderContext::getName() const @@ -82,7 +82,7 @@ const std::string& SenderContext::getTarget() const SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) { - if (processUnsettled() < capacity && pn_link_credit(sender)) { + if (processUnsettled(false) < capacity && pn_link_credit(sender)) { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address); @@ -108,11 +108,13 @@ void SenderContext::check() } } -uint32_t SenderContext::processUnsettled() +uint32_t SenderContext::processUnsettled(bool silent) { - check(); + if (!silent) { + check(); + } //remove messages from front of deque once peer has confirmed receipt - while (!deliveries.empty() && deliveries.front().delivered()) { + while (!deliveries.empty() && deliveries.front().delivered() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) { deliveries.front().settle(); deliveries.pop_front(); } @@ -529,7 +531,7 @@ void SenderContext::configure(pn_terminus_t* target) bool SenderContext::settled() { - return processUnsettled() == 0; + return processUnsettled(false) == 0; } Address SenderContext::getAddress() const diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index e389cd2e35..4d73d38afe 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -89,7 +89,7 @@ class SenderContext Deliveries deliveries; uint32_t capacity; - uint32_t processUnsettled(); + uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp index cda8c032aa..367db701cb 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -44,7 +44,7 @@ void SenderHandle::send(const Message& message, bool sync) void SenderHandle::close() { - session->closeSender(getName()); + connection->detach(session, sender); } void SenderHandle::setCapacity(uint32_t capacity) diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 9815721fa0..8170bc74b8 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -79,14 +79,14 @@ boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string } } -void SessionContext::closeReceiver(const std::string&) +void SessionContext::removeReceiver(const std::string& n) { - + receivers.erase(n); } -void SessionContext::closeSender(const std::string&) +void SessionContext::removeSender(const std::string& n) { - + senders.erase(n); } boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) @@ -153,4 +153,14 @@ bool SessionContext::settled() } return result; } + +void SessionContext::setName(const std::string& n) +{ + name = n; +} +std::string SessionContext::getName() const +{ + return name; +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index eca30a0e97..7a0e0fb23e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -54,12 +54,14 @@ class SessionContext boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); boost::shared_ptr<SenderContext> getSender(const std::string& name) const; boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; - void closeReceiver(const std::string&); - void closeSender(const std::string&); + void removeReceiver(const std::string&); + void removeSender(const std::string&); boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout); uint32_t getReceivable(); uint32_t getUnsettledAcks(); bool settled(); + void setName(const std::string&); + std::string getName() const; private: friend class ConnectionContext; typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; @@ -70,6 +72,7 @@ class SessionContext ReceiverMap receivers; DeliveryMap unacked; qpid::framing::SequenceNumber next; + std::string name; qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index bf79771ca4..45635e4ced 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -84,15 +84,25 @@ void SessionHandle::sync(bool /*block*/) qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address) { boost::shared_ptr<SenderContext> sender = session->createSender(address); - connection->attach(session, sender); - return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); + try { + connection->attach(session, sender); + return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); + } catch (...) { + session->removeSender(sender->getName()); + throw; + } } qpid::messaging::Receiver SessionHandle::createReceiver(const qpid::messaging::Address& address) { boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); - connection->attach(session, receiver); - return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); + try { + connection->attach(session, receiver); + return qpid::messaging::Receiver(new ReceiverHandle(connection, session, receiver)); + } catch (...) { + session->removeReceiver(receiver->getName()); + throw; + } } bool SessionHandle::nextReceiver(Receiver& receiver, Duration timeout) |
