summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp40
1 files changed, 39 insertions, 1 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index aa9eeb7489..46ef9eda1e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -41,6 +41,31 @@ using namespace qpid::framing::connection;
using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
+// Get timer singleton
+Timer& theTimer() {
+ static Mutex timerInitLock;
+ ScopedLock<Mutex> l(timerInitLock);
+
+ static qpid::sys::Timer t;
+ return t;
+}
+
+class HeartbeatTask : public TimerTask {
+ TimeoutHandler& timeout;
+
+ void fire() {
+ // If we ever get here then we have timed out
+ QPID_LOG(debug, "Traffic timeout");
+ timeout.idleIn();
+ }
+
+public:
+ HeartbeatTask(Duration p, TimeoutHandler& t) :
+ TimerTask(p),
+ timeout(t)
+ {}
+};
+
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
@@ -110,6 +135,16 @@ void ConnectionImpl::open()
connector->connect(host, port);
connector->init();
handler.waitForOpen();
+
+ // Enable heartbeat if requested
+ uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
+ if (heartbeat) {
+ // Set connection timeout to be 2x heart beat interval and setup timer
+ heartbeatTask = new HeartbeatTask(heartbeat * 2 * TIME_SEC, *this);
+ handler.setRcvTimeoutTask(heartbeatTask);
+ theTimer().add(heartbeatTask);
+ }
+
//enable security layer if one has been negotiated:
std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
if (securityLayer.get()) {
@@ -124,7 +159,7 @@ void ConnectionImpl::open()
void ConnectionImpl::idleIn()
{
- close();
+ connector->abort();
}
void ConnectionImpl::idleOut()
@@ -136,6 +171,9 @@ void ConnectionImpl::idleOut()
void ConnectionImpl::close()
{
if (!handler.isOpen()) return;
+ if (heartbeatTask) {
+ heartbeatTask->cancel();
+ }
handler.close();
closed(CLOSE_CODE_NORMAL, "Closed by client");
}