summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-11-26 10:10:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-11-26 10:10:26 +0000
commit92626b52ee79478f045dd3308b1c5087095d3d79 (patch)
tree038635639bb5079e07773fddc14843dc5ab9aaf4 /java
parent33e3ebd7c6b63c6506afc998db9229716e96ee4f (diff)
downloadqpid-python-92626b52ee79478f045dd3308b1c5087095d3d79.tar.gz
QPID-2796 : Added Java system test for heartbeating
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1413539 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java125
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java18
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java73
10 files changed, 159 insertions, 131 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 6f2631ac05..a0e659c359 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1560,4 +1560,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
+ localAddress + " to " + remoteAddress);
}
}
+
+ void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _delegate.setHeartbeatListener(listener);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index b6f25a2cef..a8fdaeb65c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
* @return true if the feature is supported by the server
*/
boolean isSupportedServerFeature(final String featureName);
+
+ void setHeartbeatListener(HeartbeatListener listener);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index a8cf947f6d..69e79d42a0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -422,6 +422,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return featureSupported;
}
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+ }
+
private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 25af7003d0..67d7c2a78c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -378,4 +378,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
// we just hardcode JMS selectors as supported.
return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
}
+
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _conn.getProtocolHandler().setHeartbeatListener(listener);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java b/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
new file mode 100644
index 0000000000..32a7cb0b73
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+public interface HeartbeatListener
+{
+ void heartbeatReceived();
+
+ void heartbeatSent();
+
+ static final HeartbeatListener DEFAULT = new HeartbeatListener()
+ {
+ public void heartbeatReceived()
+ {
+ }
+
+ public void heartbeatSent()
+ {
+ }
+ };
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 04d57c9fa2..816caac824 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -180,6 +181,7 @@ public class AMQProtocolHandler implements ProtocolEngine
private Sender<ByteBuffer> _sender;
private long _lastReadTime = System.currentTimeMillis();
private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -302,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -311,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -473,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -910,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -925,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index cf521c8892..aed10cf15f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -267,7 +267,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
{
-
+ _protocolHandler.heartbeatBodyReceived();
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
deleted file mode 100644
index d387a8ba93..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-class HeartbeatDiagnostics
-{
- private static final Diagnostics _impl = init();
-
- private HeartbeatDiagnostics()
- {
- }
-
- private static Diagnostics init()
- {
- return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();
- }
-
- static void sent()
- {
- _impl.sent();
- }
-
- static void timeout()
- {
- _impl.timeout();
- }
-
- static void received(boolean heartbeat)
- {
- _impl.received(heartbeat);
- }
-
- static void init(int delay, int timeout)
- {
- _impl.init(delay, timeout);
- }
-
- private static interface Diagnostics
- {
- void sent();
- void timeout();
- void received(boolean heartbeat);
- void init(int delay, int timeout);
- }
-
- private static class On implements Diagnostics
- {
- private final String[] messages = new String[50];
- private int i;
-
- private void save(String msg)
- {
- messages[i++] = msg;
- if(i >= messages.length){
- i = 0;//i.e. a circular buffer
- }
- }
-
- public void sent()
- {
- save(System.currentTimeMillis() + ": sent heartbeat");
- }
-
- public void timeout()
- {
- for(int i = 0; i < messages.length; i++)
- {
- if(messages[i] != null)
- {
- System.out.println(messages[i]);
- }
- }
- System.out.println(System.currentTimeMillis() + ": timed out");
- }
-
- public void received(boolean heartbeat)
- {
- save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data"));
- }
-
- public void init(int delay, int timeout)
- {
- System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout);
- }
- }
-
- private static class Off implements Diagnostics
- {
- public void sent()
- {
-
- }
- public void timeout()
- {
-
- }
- public void received(boolean heartbeat)
- {
-
- }
-
- public void init(int delay, int timeout)
- {
-
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
index 3c9a6e1500..4789dd0ed7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate extends ClientDelegate
}
private final ConnectionURL _connectionURL;
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate extends ClientDelegate
return null;
}
+
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ {
+ // ClientDelegate simply responds to heartbeats with heartbeats
+ _heartbeatListener.heartbeatReceived();
+ super.connectionHeartbeat(conn, hearbeat);
+ _heartbeatListener.heartbeatSent();
+ }
+
+
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
new file mode 100644
index 0000000000..bb36c7dca4
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class HeartbeatTest extends QpidBrokerTestCase
+{
+ public void testHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay","1");
+ AMQConnection conn = (AMQConnection) getConnection();
+ TestListener listener = new TestListener();
+ conn.setHeartbeatListener(listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
+ public void testNoHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay","0");
+ AMQConnection conn = (AMQConnection) getConnection();
+ TestListener listener = new TestListener();
+ conn.setHeartbeatListener(listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived);
+ assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent);
+
+ conn.close();
+ }
+
+ private class TestListener implements HeartbeatListener
+ {
+ int _heartbeatsReceived;
+ int _heartbeatsSent;
+ @Override
+ public void heartbeatReceived()
+ {
+ _heartbeatsReceived++;
+ }
+
+ @Override
+ public void heartbeatSent()
+ {
+ _heartbeatsSent++;
+ }
+ }
+}