diff options
| author | Ted Ross <tross@apache.org> | 2012-06-06 18:33:39 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2012-06-06 18:33:39 +0000 |
| commit | d8a6f53266c5fc82c95c097666289db116433668 (patch) | |
| tree | ba178278525db5577c8a70287723470b0c241eca /qpid/cpp | |
| parent | a162590b8c306d49d699ee322f4a5f99cec70ca5 (diff) | |
| download | qpid-python-d8a6f53266c5fc82c95c097666289db116433668.tar.gz | |
QPID-4040 - Close federation links after lost heartbeats.
Applied patch from Andy Goldstein.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1347044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 48 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 10 |
5 files changed, 61 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 89c9c3fb37..dd4baf9992 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -130,6 +130,7 @@ Broker::Options::Options(const std::string& name) : defaultMsgGroup("qpid.no-group"), timestampRcvMsgs(false), // set the 0.10 timestamp delivery property linkMaintenanceInterval(2), + linkHeartbeatInterval(120), maxNegotiateTime(2000) // 2s { int c = sys::SystemInfo::concurrency(); @@ -172,6 +173,7 @@ Broker::Options::Options(const std::string& name) : ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.") ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS")) + ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS")) ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation") ; } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index c57e3c849c..7095383959 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -126,6 +126,7 @@ class Broker : public sys::Runnable, public Plugin::Target, std::string defaultMsgGroup; bool timestampRcvMsgs; double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values. + uint16_t linkHeartbeatInterval; uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation private: diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 474472719e..03ff3d5793 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -148,6 +148,9 @@ Connection::~Connection() heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); } @@ -300,6 +303,9 @@ void Connection::close(connection::CloseCode code, const string& text) heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -313,6 +319,9 @@ void Connection::sendClose() { heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -326,6 +335,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); @@ -435,6 +447,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask { } }; +class LinkHeartbeatTask : public qpid::sys::TimerTask { + sys::Timer& timer; + Connection& connection; + bool heartbeatSeen; + + void fire() { + if (!heartbeatSeen) { + QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection"); + connection.abort(); + } else { + heartbeatSeen = false; + // Setup next firing + setupNextFire(); + timer.add(this); + } + } + +public: + LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) : + TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {} + + void heartbeatReceived() { heartbeatSeen = true; } +}; + + void Connection::abort() { // Make sure that we don't try to send a heartbeat as we're @@ -460,10 +497,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat) } } +void Connection::startLinkHeartbeatTimeoutTask() { + if (!linkHeartbeatTimer && heartbeat > 0) { + linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this); + timer.add(linkHeartbeatTimer); + } +} + void Connection::restartTimeout() { if (timeoutTimer) timeoutTimer->touch(); + + if (linkHeartbeatTimer) { + static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived(); + } } bool Connection::isOpen() { return adapter.isOpen(); } diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 1b8bd83139..42bd10c095 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -166,6 +166,7 @@ class Connection : public sys::ConnectionInputHandler, bool isOpen(); bool isLink() { return link; } + void startLinkHeartbeatTimeoutTask(); // Used by cluster during catch-up, see cluster::OutputInterceptor void doIoCallbacks(); @@ -189,7 +190,7 @@ class Connection : public sys::ConnectionInputHandler, LinkRegistry& links; management::ManagementAgent* agent; sys::Timer& timer; - boost::intrusive_ptr<sys::TimerTask> heartbeatTimer; + boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer; boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; ErrorListener* errorListener; uint64_t objectId; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 9594ea34ef..8db136a448 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -370,8 +370,14 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); connection.setFrameMax(maxFrameSize); - connection.setHeartbeat(heartbeatMax); - proxy.tuneOk(channelMax, maxFrameSize, heartbeatMax); + // this method is only ever called when this Connection + // is a federation link where this Broker is acting as + // a client to another Broker + uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax); + connection.setHeartbeat(hb); + connection.startLinkHeartbeatTimeoutTask(); + + proxy.tuneOk(channelMax, maxFrameSize, hb); proxy.open("/", Array(), true); } |
