summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp48
1 files changed, 41 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index b6043518e8..6b352e2e65 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -166,7 +166,8 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
version(v),
- nextChannel(1)
+ nextChannel(1),
+ shutdownComplete(false)
{
QPID_LOG(debug, "ConnectionImpl created for " << version.toString());
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
@@ -181,10 +182,12 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max();
ConnectionImpl::~ConnectionImpl() {
- // Important to close the connector first, to ensure the
- // connector thread does not call on us while the destructor
- // is running.
- if (connector) connector->close();
+ if (connector) {
+ connector->close();
+ //wait until we get the shutdown callback to ensure that the
+ //io threads will not call back on us after deletion
+ waitForShutdownComplete();
+ }
theIO().sub();
}
@@ -244,7 +247,13 @@ void ConnectionImpl::open()
connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this));
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
- connector->connect(host, port);
+ try {
+ connector->connect(host, port);
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
+ connector.reset();
+ throw;
+ }
connector->init();
// Enable heartbeat if requested
@@ -330,9 +339,22 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) {
closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
+void ConnectionImpl::shutdown() {
+ //May need to take a temporary reference to ourselves to prevent
+ //our destructor being called until after we have notified of
+ //shutdown completion; the destructor may be called as a result of
+ //the call to failedConnection().
+ boost::shared_ptr<ConnectionImpl> temp;
+ if (!handler.isClosed()) {
+ temp = shared_from_this();
+ failedConnection();
+ }
+ notifyShutdownComplete();
+}
+
static const std::string CONN_CLOSED("Connection closed");
-void ConnectionImpl::shutdown() {
+void ConnectionImpl::failedConnection() {
if ( failureCallback )
failureCallback();
@@ -375,4 +397,16 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na
return simpl;
}
+void ConnectionImpl::waitForShutdownComplete()
+{
+ Mutex::ScopedLock l(lock);
+ while(!shutdownComplete) lock.wait();
+}
+void ConnectionImpl::notifyShutdownComplete()
+{
+ Mutex::ScopedLock l(lock);
+ shutdownComplete = true;
+ lock.notifyAll();
+}
+
}} // namespace qpid::client