summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-06-06 18:33:39 +0000
committerTed Ross <tross@apache.org>2012-06-06 18:33:39 +0000
commitd8a6f53266c5fc82c95c097666289db116433668 (patch)
treeba178278525db5577c8a70287723470b0c241eca /qpid/cpp
parenta162590b8c306d49d699ee322f4a5f99cec70ca5 (diff)
downloadqpid-python-d8a6f53266c5fc82c95c097666289db116433668.tar.gz
QPID-4040 - Close federation links after lost heartbeats.
Applied patch from Andy Goldstein. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1347044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp48
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp10
5 files changed, 61 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 89c9c3fb37..dd4baf9992 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -130,6 +130,7 @@ Broker::Options::Options(const std::string& name) :
defaultMsgGroup("qpid.no-group"),
timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
linkMaintenanceInterval(2),
+ linkHeartbeatInterval(120),
maxNegotiateTime(2000) // 2s
{
int c = sys::SystemInfo::concurrency();
@@ -172,6 +173,7 @@ Broker::Options::Options(const std::string& name) :
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
+ ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
;
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index c57e3c849c..7095383959 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -126,6 +126,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
std::string defaultMsgGroup;
bool timestampRcvMsgs;
double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values.
+ uint16_t linkHeartbeatInterval;
uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation
private:
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 474472719e..03ff3d5793 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -148,6 +148,9 @@ Connection::~Connection()
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
}
@@ -300,6 +303,9 @@ void Connection::close(connection::CloseCode code, const string& text)
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -313,6 +319,9 @@ void Connection::sendClose() {
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -326,6 +335,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
@@ -435,6 +447,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask {
}
};
+class LinkHeartbeatTask : public qpid::sys::TimerTask {
+ sys::Timer& timer;
+ Connection& connection;
+ bool heartbeatSeen;
+
+ void fire() {
+ if (!heartbeatSeen) {
+ QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection");
+ connection.abort();
+ } else {
+ heartbeatSeen = false;
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+ }
+ }
+
+public:
+ LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) :
+ TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {}
+
+ void heartbeatReceived() { heartbeatSeen = true; }
+};
+
+
void Connection::abort()
{
// Make sure that we don't try to send a heartbeat as we're
@@ -460,10 +497,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat)
}
}
+void Connection::startLinkHeartbeatTimeoutTask() {
+ if (!linkHeartbeatTimer && heartbeat > 0) {
+ linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
+ timer.add(linkHeartbeatTimer);
+ }
+}
+
void Connection::restartTimeout()
{
if (timeoutTimer)
timeoutTimer->touch();
+
+ if (linkHeartbeatTimer) {
+ static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived();
+ }
}
bool Connection::isOpen() { return adapter.isOpen(); }
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 1b8bd83139..42bd10c095 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -166,6 +166,7 @@ class Connection : public sys::ConnectionInputHandler,
bool isOpen();
bool isLink() { return link; }
+ void startLinkHeartbeatTimeoutTask();
// Used by cluster during catch-up, see cluster::OutputInterceptor
void doIoCallbacks();
@@ -189,7 +190,7 @@ class Connection : public sys::ConnectionInputHandler,
LinkRegistry& links;
management::ManagementAgent* agent;
sys::Timer& timer;
- boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
uint64_t objectId;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 9594ea34ef..8db136a448 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -370,8 +370,14 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax,
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
connection.setFrameMax(maxFrameSize);
- connection.setHeartbeat(heartbeatMax);
- proxy.tuneOk(channelMax, maxFrameSize, heartbeatMax);
+ // this method is only ever called when this Connection
+ // is a federation link where this Broker is acting as
+ // a client to another Broker
+ uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax);
+ connection.setHeartbeat(hb);
+ connection.startLinkHeartbeatTimeoutTask();
+
+ proxy.tuneOk(channelMax, maxFrameSize, hb);
proxy.open("/", Array(), true);
}