summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-03-28 15:01:08 +0000
committerGordon Sim <gsim@apache.org>2013-03-28 15:01:08 +0000
commit85bfacd06a2395ba9fddc867e546efd7f4168b61 (patch)
tree2f665b69df20862809fcb1117b1101d7f9d7a258 /qpid/cpp/src
parent7799aa578726bd78bc71a4c94d0c4ea9f83f9fa5 (diff)
downloadqpid-python-85bfacd06a2395ba9fddc867e546efd7f4168b61.tar.gz
QPID-4674: Detect asynchronous connection close, session end and link detach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1462138 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp66
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h9
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp1
4 files changed, 69 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 9febe66f7e..9036031931 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
{
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn, lnk);
if (!lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
@@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
wakeupDriver();
while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
- wait();
+ wait(ssn, lnk);
}
if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
pn_link_flow(lnk->receiver, lnk->capacity);
@@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
qpid::sys::AbsTime until(convert(timeout));
while (true) {
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn, lnk);
pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
if (current) {
@@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
pn_link_advance(lnk->receiver);
return true;
} else if (until > qpid::sys::now()) {
- wait();
+ wait(ssn, lnk);
} else {
return false;
}
@@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
if (message) {
ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
} else {
@@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int c
}
}
-void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
SenderContext::Delivery* delivery(0);
while (!(delivery = snd->send(message))) {
QPID_LOG(debug, "Waiting for capacity...");
- wait();//wait for capacity
+ wait(ssn, snd);//wait for capacity
}
wakeupDriver();
if (sync) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
- wait();//wait until message has been confirmed
+ wait(ssn, snd);//wait until message has been confirmed
}
}
}
@@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver()
}
}
+namespace {
+pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
+}
+
void ConnectionContext::wait()
{
lock.wait();
if (state == DISCONNECTED) {
throw qpid::messaging::TransportFailure("Disconnected");
}
- //check for any closed links, sessions or indeed the connection
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_connection_close(connection);
+ throw qpid::messaging::ConnectionError("Connection closed by peer");
+ }
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn)
+{
+ wait();
+ checkClosed(ssn);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ wait();
+ checkClosed(ssn, lnk);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ wait();
+ checkClosed(ssn, lnk);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
+{
+ if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_session_close(ssn->session);
+ throw qpid::messaging::SessionError("Session ended by peer");
+ } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
+ throw qpid::messaging::SessionError("Session has ended");
+ }
}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ checkClosed(ssn, lnk->receiver);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ checkClosed(ssn, lnk->sender);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk)
+{
+ checkClosed(ssn);
+ if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_link_close(lnk);
+ throw qpid::messaging::LinkError("Link detached by peer");
+ } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
+ throw qpid::messaging::LinkError("Link is not attached");
+ }
+}
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 3718184365..fbff27c288 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -72,7 +72,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void endSession(boost::shared_ptr<SessionContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
- void send(boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
+ void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
@@ -136,6 +136,13 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
CodecSwitch codecSwitch;
void wait();
+ void wait(boost::shared_ptr<SessionContext>);
+ void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
+ void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
void attach(pn_session_t*, pn_link_t*, int credit=0);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
index 4e258e7b38..cda8c032aa 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
@@ -39,7 +39,7 @@ SenderHandle::SenderHandle(boost::shared_ptr<ConnectionContext> c,
void SenderHandle::send(const Message& message, bool sync)
{
- connection->send(sender, message, sync);
+ connection->send(session, sender, message, sync);
}
void SenderHandle::close()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index bca40c6058..004ee4e775 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -156,5 +156,4 @@ bool SessionContext::settled()
}
return result;
}
-
}}} // namespace qpid::messaging::amqp