diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-11-26 10:10:26 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-11-26 10:10:26 +0000 |
| commit | 024a52795718a00d2c6a12da7a0d1cd1505859e6 (patch) | |
| tree | 3754fda4cb42ea928d6011cef213164ea14a2ece /qpid/java/client/src | |
| parent | b8026c0c4c192d50f6b601346142661ef7ea6f30 (diff) | |
| download | qpid-python-024a52795718a00d2c6a12da7a0d1cd1505859e6.tar.gz | |
QPID-2796 : Added Java system test for heartbeating
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1413539 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
9 files changed, 86 insertions, 131 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6f2631ac05..a0e659c359 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1560,4 +1560,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect + localAddress + " to " + remoteAddress); } } + + void setHeartbeatListener(HeartbeatListener listener) + { + _delegate.setHeartbeatListener(listener); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index b6f25a2cef..a8fdaeb65c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -78,4 +78,6 @@ public interface AMQConnectionDelegate * @return true if the feature is supported by the server */ boolean isSupportedServerFeature(final String featureName); + + void setHeartbeatListener(HeartbeatListener listener); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a8cf947f6d..69e79d42a0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -422,6 +422,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return featureSupported; } + @Override + public void setHeartbeatListener(HeartbeatListener listener) + { + ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener); + } + private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail) { ConnectionSettings conSettings = brokerDetail.buildConnectionSettings(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 25af7003d0..67d7c2a78c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -378,4 +378,10 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // we just hardcode JMS selectors as supported. return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName); } + + @Override + public void setHeartbeatListener(HeartbeatListener listener) + { + _conn.getProtocolHandler().setHeartbeatListener(listener); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java new file mode 100644 index 0000000000..32a7cb0b73 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +public interface HeartbeatListener +{ + void heartbeatReceived(); + + void heartbeatSent(); + + static final HeartbeatListener DEFAULT = new HeartbeatListener() + { + public void heartbeatReceived() + { + } + + public void heartbeatSent() + { + } + }; +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 04d57c9fa2..816caac824 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.protocol; +import org.apache.qpid.client.HeartbeatListener; import org.apache.qpid.util.BytesDataOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +181,7 @@ public class AMQProtocolHandler implements ProtocolEngine private Sender<ByteBuffer> _sender; private long _lastReadTime = System.currentTimeMillis(); private long _lastWriteTime = System.currentTimeMillis(); + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -302,7 +304,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); // failover: - HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); _network.close(); } @@ -311,7 +312,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("Protocol Session [" + this + "] idle: reader"); writeFrame(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); + _heartbeatListener.heartbeatSent(); } /** @@ -473,8 +474,6 @@ public class AMQProtocolHandler implements ProtocolEngine final AMQBody bodyFrame = frame.getBodyFrame(); - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_readBytes); @@ -910,7 +909,6 @@ public class AMQProtocolHandler implements ProtocolEngine { _network.setMaxWriteIdle(delay); _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } @@ -925,5 +923,13 @@ public class AMQProtocolHandler implements ProtocolEngine } + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } + public void heartbeatBodyReceived() + { + _heartbeatListener.heartbeatReceived(); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index cf521c8892..aed10cf15f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -267,7 +267,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException { - + _protocolHandler.heartbeatBodyReceived(); } /** diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java deleted file mode 100644 index d387a8ba93..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -class HeartbeatDiagnostics -{ - private static final Diagnostics _impl = init(); - - private HeartbeatDiagnostics() - { - } - - private static Diagnostics init() - { - return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off(); - } - - static void sent() - { - _impl.sent(); - } - - static void timeout() - { - _impl.timeout(); - } - - static void received(boolean heartbeat) - { - _impl.received(heartbeat); - } - - static void init(int delay, int timeout) - { - _impl.init(delay, timeout); - } - - private static interface Diagnostics - { - void sent(); - void timeout(); - void received(boolean heartbeat); - void init(int delay, int timeout); - } - - private static class On implements Diagnostics - { - private final String[] messages = new String[50]; - private int i; - - private void save(String msg) - { - messages[i++] = msg; - if(i >= messages.length){ - i = 0;//i.e. a circular buffer - } - } - - public void sent() - { - save(System.currentTimeMillis() + ": sent heartbeat"); - } - - public void timeout() - { - for(int i = 0; i < messages.length; i++) - { - if(messages[i] != null) - { - System.out.println(messages[i]); - } - } - System.out.println(System.currentTimeMillis() + ": timed out"); - } - - public void received(boolean heartbeat) - { - save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data")); - } - - public void init(int delay, int timeout) - { - System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout); - } - } - - private static class Off implements Diagnostics - { - public void sent() - { - - } - public void timeout() - { - - } - public void received(boolean heartbeat) - { - - } - - public void init(int delay, int timeout) - { - - } - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index 3c9a6e1500..4789dd0ed7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.transport; +import org.apache.qpid.client.HeartbeatListener; +import org.apache.qpid.transport.ConnectionHeartbeat; import org.ietf.jgss.GSSContext; import org.ietf.jgss.GSSException; import org.ietf.jgss.GSSManager; @@ -70,6 +72,7 @@ public class ClientConnectionDelegate extends ClientDelegate } private final ConnectionURL _connectionURL; + private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT; /** * @param settings @@ -165,4 +168,19 @@ public class ClientConnectionDelegate extends ClientDelegate return null; } + + @Override + public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat) + { + // ClientDelegate simply responds to heartbeats with heartbeats + _heartbeatListener.heartbeatReceived(); + super.connectionHeartbeat(conn, hearbeat); + _heartbeatListener.heartbeatSent(); + } + + + public void setHeartbeatListener(HeartbeatListener listener) + { + _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener; + } } |
