diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:15:31 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:15:31 +0000 |
| commit | 766c218055567f4e387aab4678d8d1c840916465 (patch) | |
| tree | a8caf1fc75a6505124b3e6d4ed3b36de5894f50c /cpp/src/qpid/client/ConnectionHandler.cpp | |
| parent | 5fec8f487c510e2309b3bc939fea70078a11af97 (diff) | |
| download | qpid-python-766c218055567f4e387aab4678d8d1c840916465.tar.gz | |
Implement heartbeat timeout on client:
- The client shuts down a connection if
it receives no traffic on it in 2 timeout periods
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734221 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index d5b3f2264b..d6d024cf3f 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -33,6 +33,13 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::framing::connection; using qpid::sys::SecurityLayer; +using qpid::sys::Duration; +using qpid::sys::TimerTask; +using qpid::sys::Timer; +using qpid::sys::AbsTime; +using qpid::sys::TIME_SEC; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace { const std::string OK("OK"); @@ -60,7 +67,7 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode) ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v) -{ +{ insist = true; ESTABLISHED.insert(FAILED); @@ -69,14 +76,18 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio FINISHED.insert(FAILED); FINISHED.insert(CLOSED); -} +} void ConnectionHandler::incoming(AMQFrame& frame) { if (getState() == CLOSED) { - throw Exception("Received frame on closed connection"); + throw Exception("Received frame on closed connection"); } + if (rcvTimeoutTask) { + // Received frame on connection so delay timeout + rcvTimeoutTask->restart(); + } AMQBody* body = frame.getBody(); try { @@ -86,18 +97,18 @@ void ConnectionHandler::incoming(AMQFrame& frame) in(frame); break; case CLOSING: - QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); + QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); break; default: throw Exception("Cannot receive frames on non-zero channel until connection is established."); } } }catch(std::exception& e){ - QPID_LOG(warning, "Closing connection due to " << e.what()); + QPID_LOG(warning, "Closing connection due to " << e.what()); setState(CLOSING); errorCode = CLOSE_CODE_FRAMING_ERROR; errorText = e.what(); - proxy.close(501, e.what()); + proxy.close(501, e.what()); } } @@ -135,9 +146,9 @@ void ConnectionHandler::close() void ConnectionHandler::heartbeat() { - // Do nothing - the purpose of heartbeats is just to make sure that there is some - // traffic on the connection within the heart beat interval, we check for the - // traffic and don't need to do anything in response to heartbeats + // Do nothing - the purpose of heartbeats is just to make sure that there is some + // traffic on the connection within the heart beat interval, we check for the + // traffic and don't need to do anything in response to heartbeats } void ConnectionHandler::checkState(STATES s, const std::string& msg) @@ -175,7 +186,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me if (i != mechanisms.begin()) mechlist += SPACE; mechlist += (*i)->get<std::string>(); } - } + } if (!chosenMechanismSupported) { fail("Selected mechanism not supported: " + mechanism); @@ -210,11 +221,10 @@ void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSize // Clip the requested heartbeat to the maximum/minimum offered uint16_t heartbeat = ConnectionSettings::heartbeat; heartbeat = heartbeat < heartbeatMin ? heartbeatMin : - heartbeat > heartbeatMax ? heartbeatMax : - heartbeat; + heartbeat > heartbeatMax ? heartbeatMax : + heartbeat; + ConnectionSettings::heartbeat = heartbeat; proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); - // TODO set connection timeout to be 2x heart beat interval - // TODO and set an alarm for it. setState(OPENING); proxy.open(virtualhost, capabilities, insist); } @@ -279,3 +289,8 @@ std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer() { return securityLayer; } + +void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t) +{ + rcvTimeoutTask = t; +} |
