summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionHandler.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-13 19:15:31 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-13 19:15:31 +0000
commit766c218055567f4e387aab4678d8d1c840916465 (patch)
treea8caf1fc75a6505124b3e6d4ed3b36de5894f50c /cpp/src/qpid/client/ConnectionHandler.cpp
parent5fec8f487c510e2309b3bc939fea70078a11af97 (diff)
downloadqpid-python-766c218055567f4e387aab4678d8d1c840916465.tar.gz
Implement heartbeat timeout on client:
- The client shuts down a connection if it receives no traffic on it in 2 timeout periods git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734221 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp43
1 files changed, 29 insertions, 14 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index d5b3f2264b..d6d024cf3f 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -33,6 +33,13 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::framing::connection;
using qpid::sys::SecurityLayer;
+using qpid::sys::Duration;
+using qpid::sys::TimerTask;
+using qpid::sys::Timer;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_SEC;
+using qpid::sys::ScopedLock;
+using qpid::sys::Mutex;
namespace {
const std::string OK("OK");
@@ -60,7 +67,7 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode)
ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
: StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
errorCode(CLOSE_CODE_NORMAL), version(v)
-{
+{
insist = true;
ESTABLISHED.insert(FAILED);
@@ -69,14 +76,18 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
-}
+}
void ConnectionHandler::incoming(AMQFrame& frame)
{
if (getState() == CLOSED) {
- throw Exception("Received frame on closed connection");
+ throw Exception("Received frame on closed connection");
}
+ if (rcvTimeoutTask) {
+ // Received frame on connection so delay timeout
+ rcvTimeoutTask->restart();
+ }
AMQBody* body = frame.getBody();
try {
@@ -86,18 +97,18 @@ void ConnectionHandler::incoming(AMQFrame& frame)
in(frame);
break;
case CLOSING:
- QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
+ QPID_LOG(warning, "Ignoring frame while closing connection: " << frame);
break;
default:
throw Exception("Cannot receive frames on non-zero channel until connection is established.");
}
}
}catch(std::exception& e){
- QPID_LOG(warning, "Closing connection due to " << e.what());
+ QPID_LOG(warning, "Closing connection due to " << e.what());
setState(CLOSING);
errorCode = CLOSE_CODE_FRAMING_ERROR;
errorText = e.what();
- proxy.close(501, e.what());
+ proxy.close(501, e.what());
}
}
@@ -135,9 +146,9 @@ void ConnectionHandler::close()
void ConnectionHandler::heartbeat()
{
- // Do nothing - the purpose of heartbeats is just to make sure that there is some
- // traffic on the connection within the heart beat interval, we check for the
- // traffic and don't need to do anything in response to heartbeats
+ // Do nothing - the purpose of heartbeats is just to make sure that there is some
+ // traffic on the connection within the heart beat interval, we check for the
+ // traffic and don't need to do anything in response to heartbeats
}
void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -175,7 +186,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
if (i != mechanisms.begin()) mechlist += SPACE;
mechlist += (*i)->get<std::string>();
}
- }
+ }
if (!chosenMechanismSupported) {
fail("Selected mechanism not supported: " + mechanism);
@@ -210,11 +221,10 @@ void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSize
// Clip the requested heartbeat to the maximum/minimum offered
uint16_t heartbeat = ConnectionSettings::heartbeat;
heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
- heartbeat > heartbeatMax ? heartbeatMax :
- heartbeat;
+ heartbeat > heartbeatMax ? heartbeatMax :
+ heartbeat;
+ ConnectionSettings::heartbeat = heartbeat;
proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
- // TODO set connection timeout to be 2x heart beat interval
- // TODO and set an alarm for it.
setState(OPENING);
proxy.open(virtualhost, capabilities, insist);
}
@@ -279,3 +289,8 @@ std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer()
{
return securityLayer;
}
+
+void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t)
+{
+ rcvTimeoutTask = t;
+}