diff options
| author | Gordon Sim <gsim@apache.org> | 2013-03-28 15:01:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-03-28 15:01:08 +0000 |
| commit | 85bfacd06a2395ba9fddc867e546efd7f4168b61 (patch) | |
| tree | 2f665b69df20862809fcb1117b1101d7f9d7a258 /qpid/cpp/src | |
| parent | 7799aa578726bd78bc71a4c94d0c4ea9f83f9fa5 (diff) | |
| download | qpid-python-85bfacd06a2395ba9fddc867e546efd7f4168b61.tar.gz | |
QPID-4674: Detect asynchronous connection close, session end and link detach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1462138 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
4 files changed, 69 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9febe66f7e..9036031931 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar { { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn, lnk); if (!lnk->capacity) { pn_link_flow(lnk->receiver, 1); wakeupDriver(); @@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar wakeupDriver(); while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); - wait(); + wait(ssn, lnk); } if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { pn_link_flow(lnk->receiver, lnk->capacity); @@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared qpid::sys::AbsTime until(convert(timeout)); while (true) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn, lnk); pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); if (current) { @@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared pn_link_advance(lnk->receiver); return true; } else if (until > qpid::sys::now()) { - wait(); + wait(ssn, lnk); } else { return false; } @@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn); if (message) { ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); } else { @@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int c } } -void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn); SenderContext::Delivery* delivery(0); while (!(delivery = snd->send(message))) { QPID_LOG(debug, "Waiting for capacity..."); - wait();//wait for capacity + wait(ssn, snd);//wait for capacity } wakeupDriver(); if (sync) { while (!delivery->accepted()) { QPID_LOG(debug, "Waiting for confirmation..."); - wait();//wait until message has been confirmed + wait(ssn, snd);//wait until message has been confirmed } } } @@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver() } } +namespace { +pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; +pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; +} + void ConnectionContext::wait() { lock.wait(); if (state == DISCONNECTED) { throw qpid::messaging::TransportFailure("Disconnected"); } - //check for any closed links, sessions or indeed the connection + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_connection_close(connection); + throw qpid::messaging::ConnectionError("Connection closed by peer"); + } +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn) +{ + wait(); + checkClosed(ssn); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) +{ + if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_session_close(ssn->session); + throw qpid::messaging::SessionError("Session ended by peer"); + } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::SessionError("Session has ended"); + } } +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + checkClosed(ssn, lnk->receiver); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + checkClosed(ssn, lnk->sender); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk) +{ + checkClosed(ssn); + if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_link_close(lnk); + throw qpid::messaging::LinkError("Link detached by peer"); + } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::LinkError("Link is not attached"); + } +} boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 3718184365..fbff27c288 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -72,7 +72,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void endSession(boost::shared_ptr<SessionContext>); void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); - void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); + void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); @@ -136,6 +136,13 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag CodecSwitch codecSwitch; void wait(); + void wait(boost::shared_ptr<SessionContext>); + void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void checkClosed(boost::shared_ptr<SessionContext>); + void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); + void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); void wakeupDriver(); void attach(pn_session_t*, pn_link_t*, int credit=0); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp index 4e258e7b38..cda8c032aa 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp @@ -39,7 +39,7 @@ SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c, void SenderHandle::send(const Message& message, bool sync) { - connection->send(sender, message, sync); + connection->send(session, sender, message, sync); } void SenderHandle::close() diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index bca40c6058..004ee4e775 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -156,5 +156,4 @@ bool SessionContext::settled() } return result; } - }}} // namespace qpid::messaging::amqp |
