diff options
| author | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
| commit | 9b1bf6023a9869af744e2fbc03664d41ced37df6 (patch) | |
| tree | 8999f016b40ce753bb560589326fa84d507759ea /qpid/java/client | |
| parent | f6c119bb54dceaa1451f31c7c0be504a7f6bf3ca (diff) | |
| download | qpid-python-9b1bf6023a9869af744e2fbc03664d41ced37df6.tar.gz | |
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols
* System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols
* Enhanced heartbeat system tests
* Docbook updates
Original patch from Keith Wall, plus updates from Robbie Gemmell
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1537313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
10 files changed, 110 insertions, 150 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 597096db57..25d37aafb1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -290,6 +290,19 @@ public class AMQBrokerDetails implements BrokerDetails } } + private int getIntegerProperty(String key) + { + String stringValue = getProperty(key); + try + { + return Integer.parseInt(stringValue); + } + catch (NumberFormatException e) + { + throw new IllegalArgumentException("Cannot parse key " + key + " with value '" + stringValue + "' as integer.", e); + } + } + public String toString() { StringBuffer sb = new StringBuffer(); @@ -464,6 +477,16 @@ public class AMQBrokerDetails implements BrokerDetails conSettings.setConnectTimeout(lookupConnectTimeout()); + if (getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) + { + conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + else if (getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) / 1000); + } + return conSettings; } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 66590aa0d7..95b1178407 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -29,7 +29,7 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.configuration.ClientProperties; + import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; @@ -448,8 +448,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec // Ignore } - conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); - //Check connection-level ssl override setting String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL); if(connectionSslOption != null) @@ -470,37 +468,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return conSettings; } - - // The idle_timeout prop is in milisecs while - // the new heartbeat prop is in secs - private int getHeartbeatInterval(BrokerDetails brokerDetail) - { - int heartbeat = 0; - if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) - { - _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>"); - heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000; - } - else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) - { - heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); - } - else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) - { - heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; - _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>"); - } - else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) - { - heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); - } - else - { - heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); - } - return heartbeat; - } - protected org.apache.qpid.transport.Connection getQpidConnection() { return _qpidConnection; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 340aca70eb..dfbf7ec60a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -124,10 +124,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), _conn.getProtocolHandler()); + _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); - _conn.getProtocolHandler().getProtocolSession().init(); + _conn.getProtocolHandler().getProtocolSession().init(settings); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java index b1ec7216bc..40264f837e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java @@ -26,9 +26,20 @@ public class ConnectionTuneParameters private int _channelMax; - private int _heartbeat; + /** Heart-beating interval in seconds, null if not set, use 0 to disable */ + private Integer _heartbeat; - private long _txnLimit; + private float _heartbeatTimeoutFactor; + + public float getHeartbeatTimeoutFactor() + { + return _heartbeatTimeoutFactor; + } + + public void setHeartbeatTimeoutFactor(float heartbeatTimeoutFactor) + { + _heartbeatTimeoutFactor = heartbeatTimeoutFactor; + } public long getFrameMax() { @@ -50,23 +61,13 @@ public class ConnectionTuneParameters _channelMax = channelMax; } - public int getHeartbeat() + public Integer getHeartbeat() { return _heartbeat; } - public void setHeartbeat(int hearbeat) + public void setHeartbeat(Integer hearbeat) { _heartbeat = hearbeat; } - - public long getTxnLimit() - { - return _txnLimit; - } - - public void setTxnLimit(long txnLimit) - { - _txnLimit = txnLimit; - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index f77718672e..617380e149 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -52,20 +52,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con _logger.debug("ConnectionTune frame received"); final MethodRegistry methodRegistry = session.getMethodRegistry(); - ConnectionTuneParameters params = session.getConnectionTuneParameters(); - if (params == null) - { - params = new ConnectionTuneParameters(); - } - + int maxChannelNumber = frame.getChannelMax(); //0 implies no limit, except that forced by protocol limitations (0xFFFF) params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber); - params.setFrameMax(frame.getFrameMax()); - params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); - session.setConnectionTuneParameters(params); + + //if the heart beat delay hasn't been configured, we use the broker-supplied value + if (params.getHeartbeat() == null) + { + params.setHeartbeat(frame.getHeartbeat()); + } + + session.tuneConnection(params); session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 816caac824..37e6904378 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -902,13 +902,13 @@ public class AMQProtocolHandler implements ProtocolEngine return _sender; } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) + void initHeartbeats(int delay, float timeoutFactor) { if (delay > 0) { _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + int readerIdle = (int)(delay * timeoutFactor); + _network.setMaxReadIdle(readerIdle); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 67bd8de846..4027ccb725 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -43,6 +43,7 @@ import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -63,18 +64,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class); - public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; - //Usable channels are numbered 1 to <ChannelMax> public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 1; - protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; - - protected static final String AMQ_CONNECTION = "AMQConnection"; - - protected static final String SASL_CLIENT = "SASLClient"; - private final AMQProtocolHandler _protocolHandler; private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); @@ -120,13 +113,38 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _connection = connection; } - public void init() + public void init(ConnectionSettings settings) { // start the process of setting up the connection. This is the first place that // data is written to the server. + initialiseTuneParameters(settings); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } + public ConnectionTuneParameters getConnectionTuneParameters() + { + return _connectionTuneParameters; + } + + private void initialiseTuneParameters(ConnectionSettings settings) + { + _connectionTuneParameters = new ConnectionTuneParameters(); + _connectionTuneParameters.setHeartbeat(settings.getHeartbeatInterval08()); + _connectionTuneParameters.setHeartbeatTimeoutFactor(settings.getHeartbeatTimeoutFactor()); + } + + public void tuneConnection(ConnectionTuneParameters params) + { + _connectionTuneParameters = params; + AMQConnection con = getAMQConnection(); + + con.setMaximumChannelCount(params.getChannelMax()); + con.setMaximumFrameSize(params.getFrameMax()); + + _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor()); + } + public String getClientID() { try @@ -170,24 +188,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _saslClient = client; } - public ConnectionTuneParameters getConnectionTuneParameters() - { - return _connectionTuneParameters; - } - - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _connectionTuneParameters = params; - AMQConnection con = getAMQConnection(); - - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - _protocolHandler.initHeartbeats((int) params.getHeartbeat()); - } - /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA - * dispatcher thread. + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. * * @param message * @@ -409,7 +411,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { if (_logger.isDebugEnabled()) { - _logger.debug("Setting ProtocolVersion to :" + pv); + _logger.debug("Setting ProtocolVersion to :" + pv); } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java deleted file mode 100644 index 35ea44a331..0000000000 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class HeartbeatConfig -{ - private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class); - static final HeartbeatConfig CONFIG = new HeartbeatConfig(); - - /** - * The factor used to get the timeout from the delay between heartbeats. - */ - private float timeoutFactor = 2; - - HeartbeatConfig() - { - String property = System.getProperty("amqj.heartbeat.timeoutFactor"); - if (property != null) - { - try - { - timeoutFactor = Float.parseFloat(property); - } - catch (NumberFormatException e) - { - _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property); - } - } - } - - float getTimeoutFactor() - { - return timeoutFactor; - } - - int getTimeout(int writeDelay) - { - return (int) (timeoutFactor * writeDelay); - } -} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 4a7fca1efa..b039d8b005 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,8 +34,9 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; - public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated public static final String OPTIONS_HEARTBEAT = "heartbeat"; + @Deprecated + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; public static final String OPTIONS_SASL_MECHS = "sasl_mechs"; public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption"; public static final String OPTIONS_SSL = "ssl"; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java index 1e9e5b00a5..ad9d3d3516 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java @@ -164,4 +164,30 @@ public class BrokerDetailsTest extends TestCase assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL))); } + + public void testHeartbeatDefaultsToNull() throws Exception + { + String brokerURL = "tcp://localhost:5672"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertNull("unexpected default value for " + BrokerDetails.OPTIONS_HEARTBEAT, broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + + public void testOverriddingHeartbeat() throws Exception + { + String brokerURL = "tcp://localhost:5672?heartbeat='60'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertEquals(60, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT))); + + assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08()); + } + + @SuppressWarnings("deprecation") + public void testLegacyHeartbeat() throws Exception + { + String brokerURL = "tcp://localhost:5672?idle_timeout='60000'"; + AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL); + assertEquals(60000, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); + + assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08()); + } } |
