diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 40 |
1 files changed, 39 insertions, 1 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index aa9eeb7489..46ef9eda1e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -41,6 +41,31 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes +// Get timer singleton +Timer& theTimer() { + static Mutex timerInitLock; + ScopedLock<Mutex> l(timerInitLock); + + static qpid::sys::Timer t; + return t; +} + +class HeartbeatTask : public TimerTask { + TimeoutHandler& timeout; + + void fire() { + // If we ever get here then we have timed out + QPID_LOG(debug, "Traffic timeout"); + timeout.idleIn(); + } + +public: + HeartbeatTask(Duration p, TimeoutHandler& t) : + TimerTask(p), + timeout(t) + {} +}; + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -110,6 +135,16 @@ void ConnectionImpl::open() connector->connect(host, port); connector->init(); handler.waitForOpen(); + + // Enable heartbeat if requested + uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat; + if (heartbeat) { + // Set connection timeout to be 2x heart beat interval and setup timer + heartbeatTask = new HeartbeatTask(heartbeat * 2 * TIME_SEC, *this); + handler.setRcvTimeoutTask(heartbeatTask); + theTimer().add(heartbeatTask); + } + //enable security layer if one has been negotiated: std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer(); if (securityLayer.get()) { @@ -124,7 +159,7 @@ void ConnectionImpl::open() void ConnectionImpl::idleIn() { - close(); + connector->abort(); } void ConnectionImpl::idleOut() @@ -136,6 +171,9 @@ void ConnectionImpl::idleOut() void ConnectionImpl::close() { if (!handler.isOpen()) return; + if (heartbeatTask) { + heartbeatTask->cancel(); + } handler.close(); closed(CLOSE_CODE_NORMAL, "Closed by client"); } |
