diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 48 |
1 files changed, 41 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index b6043518e8..6b352e2e65 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -166,7 +166,8 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), version(v), - nextChannel(1) + nextChannel(1), + shutdownComplete(false) { QPID_LOG(debug, "ConnectionImpl created for " << version.toString()); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -181,10 +182,12 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti const uint16_t ConnectionImpl::NEXT_CHANNEL = std::numeric_limits<uint16_t>::max(); ConnectionImpl::~ConnectionImpl() { - // Important to close the connector first, to ensure the - // connector thread does not call on us while the destructor - // is running. - if (connector) connector->close(); + if (connector) { + connector->close(); + //wait until we get the shutdown callback to ensure that the + //io threads will not call back on us after deletion + waitForShutdownComplete(); + } theIO().sub(); } @@ -244,7 +247,13 @@ void ConnectionImpl::open() connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); - connector->connect(host, port); + try { + connector->connect(host, port); + } catch (const std::exception& e) { + QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what()); + connector.reset(); + throw; + } connector->init(); // Enable heartbeat if requested @@ -330,9 +339,22 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) { closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } +void ConnectionImpl::shutdown() { + //May need to take a temporary reference to ourselves to prevent + //our destructor being called until after we have notified of + //shutdown completion; the destructor may be called as a result of + //the call to failedConnection(). + boost::shared_ptr<ConnectionImpl> temp; + if (!handler.isClosed()) { + temp = shared_from_this(); + failedConnection(); + } + notifyShutdownComplete(); +} + static const std::string CONN_CLOSED("Connection closed"); -void ConnectionImpl::shutdown() { +void ConnectionImpl::failedConnection() { if ( failureCallback ) failureCallback(); @@ -375,4 +397,16 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } +void ConnectionImpl::waitForShutdownComplete() +{ + Mutex::ScopedLock l(lock); + while(!shutdownComplete) lock.wait(); +} +void ConnectionImpl::notifyShutdownComplete() +{ + Mutex::ScopedLock l(lock); + shutdownComplete = true; + lock.notifyAll(); +} + }} // namespace qpid::client |
