diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:08:29 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:08:29 +0000 |
| commit | 5fec8f487c510e2309b3bc939fea70078a11af97 (patch) | |
| tree | f875a150066a08094eba72856de99e471185a0aa /cpp/src/qpid/broker/Connection.cpp | |
| parent | 77bde150020c6c34130523f528d739c264aa12e3 (diff) | |
| download | qpid-python-5fec8f487c510e2309b3bc939fea70078a11af97.tar.gz | |
Send heartbeat from broker to client
- Server sends possible heartbeat range and client replies with desired
heartbeat as part of the tune-tuneOk exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@734220 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index eb54ddfd56..66ee6281c6 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -38,7 +38,6 @@ using namespace qpid::sys; using namespace qpid::framing; -using namespace qpid::sys; using qpid::ptr_map_ptr; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; @@ -57,7 +56,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtId(mgmtId_), mgmtObject(0), links(broker_.getLinks()), - agent(0) + agent(0), + timer(broker_.getTimer()) { Manageable* parent = broker.GetVhostObject(); @@ -92,6 +92,9 @@ Connection::~Connection() } if (isLink) links.notifyClosed(mgmtId); + + if (heartbeatTimer) + heartbeatTimer->cancel(); } void Connection::received(framing::AMQFrame& frame) { @@ -174,6 +177,8 @@ void Connection::setFederationLink(bool b) void Connection::close(connection::CloseCode code, const string& text) { QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); + if (heartbeatTimer) + heartbeatTimer->cancel(); adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -183,6 +188,8 @@ void Connection::close(connection::CloseCode code, const string& text) // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { + if (heartbeatTimer) + heartbeatTimer->cancel(); adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -230,6 +237,10 @@ bool Connection::doOutput() { return false; } +void Connection::sendHeartbeat() { + adapter.heartbeat(); +} + void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); if (i != channels.end()) channels.erase(i); @@ -272,5 +283,36 @@ void Connection::setSecureConnection(SecureConnection* s) adapter.setSecureConnection(s); } +struct ConnectionHeartbeatTask : public TimerTask { + Timer& timer; + Connection& connection; + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + TimerTask(Duration(hb*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Setup next firing + reset(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); + } + } +}; + +void Connection::setHeartbeatInterval(uint16_t heartbeat) +{ + setHeartbeat(heartbeat); + if (heartbeat > 0) { + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + } +} + }} |
