summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2013-10-30 21:38:03 +0000
committerKeith Wall <kwall@apache.org>2013-10-30 21:38:03 +0000
commit9b1bf6023a9869af744e2fbc03664d41ced37df6 (patch)
tree8999f016b40ce753bb560589326fa84d507759ea /qpid/java/client
parentf6c119bb54dceaa1451f31c7c0be504a7f6bf3ca (diff)
downloadqpid-python-9b1bf6023a9869af744e2fbc03664d41ced37df6.tar.gz
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols * System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols * Enhanced heartbeat system tests * Docbook updates Original patch from Keith Wall, plus updates from Robbie Gemmell git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1537313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java35
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java18
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java56
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java61
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java3
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java26
10 files changed, 110 insertions, 150 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 597096db57..25d37aafb1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -290,6 +290,19 @@ public class AMQBrokerDetails implements BrokerDetails
}
}
+ private int getIntegerProperty(String key)
+ {
+ String stringValue = getProperty(key);
+ try
+ {
+ return Integer.parseInt(stringValue);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Cannot parse key " + key + " with value '" + stringValue + "' as integer.", e);
+ }
+ }
+
public String toString()
{
StringBuffer sb = new StringBuffer();
@@ -464,6 +477,16 @@ public class AMQBrokerDetails implements BrokerDetails
conSettings.setConnectTimeout(lookupConnectTimeout());
+ if (getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
+ {
+ conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+ else if (getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) / 1000);
+ }
+
return conSettings;
}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 66590aa0d7..95b1178407 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -29,7 +29,7 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.configuration.ClientProperties;
+
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
@@ -448,8 +448,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
// Ignore
}
- conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
-
//Check connection-level ssl override setting
String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
if(connectionSslOption != null)
@@ -470,37 +468,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return conSettings;
}
-
- // The idle_timeout prop is in milisecs while
- // the new heartbeat prop is in secs
- private int getHeartbeatInterval(BrokerDetails brokerDetail)
- {
- int heartbeat = 0;
- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
- {
- _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>");
- heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000;
- }
- else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
- {
- heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
- }
- else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
- {
- heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
- _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
- }
- else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null)
- {
- heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
- }
- else
- {
- heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT);
- }
- return heartbeat;
- }
-
protected org.apache.qpid.transport.Connection getQpidConnection()
{
return _qpidConnection;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 340aca70eb..dfbf7ec60a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -124,10 +124,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
_conn.getProtocolHandler());
+
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
- _conn.getProtocolHandler().getProtocolSession().init();
+ _conn.getProtocolHandler().getProtocolSession().init(settings);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java
index b1ec7216bc..40264f837e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java
@@ -26,9 +26,20 @@ public class ConnectionTuneParameters
private int _channelMax;
- private int _heartbeat;
+ /** Heart-beating interval in seconds, null if not set, use 0 to disable */
+ private Integer _heartbeat;
- private long _txnLimit;
+ private float _heartbeatTimeoutFactor;
+
+ public float getHeartbeatTimeoutFactor()
+ {
+ return _heartbeatTimeoutFactor;
+ }
+
+ public void setHeartbeatTimeoutFactor(float heartbeatTimeoutFactor)
+ {
+ _heartbeatTimeoutFactor = heartbeatTimeoutFactor;
+ }
public long getFrameMax()
{
@@ -50,23 +61,13 @@ public class ConnectionTuneParameters
_channelMax = channelMax;
}
- public int getHeartbeat()
+ public Integer getHeartbeat()
{
return _heartbeat;
}
- public void setHeartbeat(int hearbeat)
+ public void setHeartbeat(Integer hearbeat)
{
_heartbeat = hearbeat;
}
-
- public long getTxnLimit()
- {
- return _txnLimit;
- }
-
- public void setTxnLimit(long txnLimit)
- {
- _txnLimit = txnLimit;
- }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index f77718672e..617380e149 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -52,20 +52,20 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con
_logger.debug("ConnectionTune frame received");
final MethodRegistry methodRegistry = session.getMethodRegistry();
-
ConnectionTuneParameters params = session.getConnectionTuneParameters();
- if (params == null)
- {
- params = new ConnectionTuneParameters();
- }
-
+
int maxChannelNumber = frame.getChannelMax();
//0 implies no limit, except that forced by protocol limitations (0xFFFF)
params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber);
-
params.setFrameMax(frame.getFrameMax());
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
- session.setConnectionTuneParameters(params);
+
+ //if the heart beat delay hasn't been configured, we use the broker-supplied value
+ if (params.getHeartbeat() == null)
+ {
+ params.setHeartbeat(frame.getHeartbeat());
+ }
+
+ session.tuneConnection(params);
session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 816caac824..37e6904378 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -902,13 +902,13 @@ public class AMQProtocolHandler implements ProtocolEngine
return _sender;
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
+ void initHeartbeats(int delay, float timeoutFactor)
{
if (delay > 0)
{
_network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ int readerIdle = (int)(delay * timeoutFactor);
+ _network.setMaxReadIdle(readerIdle);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 67bd8de846..4027ccb725 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -43,6 +43,7 @@ import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -63,18 +64,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
- public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
-
//Usable channels are numbered 1 to <ChannelMax>
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 1;
- protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
-
- protected static final String AMQ_CONNECTION = "AMQConnection";
-
- protected static final String SASL_CLIENT = "SASLClient";
-
private final AMQProtocolHandler _protocolHandler;
private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
@@ -120,13 +113,38 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_connection = connection;
}
- public void init()
+ public void init(ConnectionSettings settings)
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
+ initialiseTuneParameters(settings);
+
_protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ private void initialiseTuneParameters(ConnectionSettings settings)
+ {
+ _connectionTuneParameters = new ConnectionTuneParameters();
+ _connectionTuneParameters.setHeartbeat(settings.getHeartbeatInterval08());
+ _connectionTuneParameters.setHeartbeatTimeoutFactor(settings.getHeartbeatTimeoutFactor());
+ }
+
+ public void tuneConnection(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+
+ _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor());
+ }
+
public String getClientID()
{
try
@@ -170,24 +188,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_saslClient = client;
}
- public ConnectionTuneParameters getConnectionTuneParameters()
- {
- return _connectionTuneParameters;
- }
-
- public void setConnectionTuneParameters(ConnectionTuneParameters params)
- {
- _connectionTuneParameters = params;
- AMQConnection con = getAMQConnection();
-
- con.setMaximumChannelCount(params.getChannelMax());
- con.setMaximumFrameSize(params.getFrameMax());
- _protocolHandler.initHeartbeats((int) params.getHeartbeat());
- }
-
/**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
- * dispatcher thread.
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
*
* @param message
*
@@ -409,7 +411,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Setting ProtocolVersion to :" + pv);
+ _logger.debug("Setting ProtocolVersion to :" + pv);
}
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
deleted file mode 100644
index 35ea44a331..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class HeartbeatConfig
-{
- private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class);
- static final HeartbeatConfig CONFIG = new HeartbeatConfig();
-
- /**
- * The factor used to get the timeout from the delay between heartbeats.
- */
- private float timeoutFactor = 2;
-
- HeartbeatConfig()
- {
- String property = System.getProperty("amqj.heartbeat.timeoutFactor");
- if (property != null)
- {
- try
- {
- timeoutFactor = Float.parseFloat(property);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property);
- }
- }
- }
-
- float getTimeoutFactor()
- {
- return timeoutFactor;
- }
-
- int getTimeout(int writeDelay)
- {
- return (int) (timeoutFactor * writeDelay);
- }
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 4a7fca1efa..b039d8b005 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -34,8 +34,9 @@ public interface BrokerDetails
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
- public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated
public static final String OPTIONS_HEARTBEAT = "heartbeat";
+ @Deprecated
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
public static final String OPTIONS_SSL = "ssl";
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
index 1e9e5b00a5..ad9d3d3516 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
@@ -164,4 +164,30 @@ public class BrokerDetailsTest extends TestCase
assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
}
+
+ public void testHeartbeatDefaultsToNull() throws Exception
+ {
+ String brokerURL = "tcp://localhost:5672";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+ assertNull("unexpected default value for " + BrokerDetails.OPTIONS_HEARTBEAT, broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+
+ public void testOverriddingHeartbeat() throws Exception
+ {
+ String brokerURL = "tcp://localhost:5672?heartbeat='60'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+ assertEquals(60, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)));
+
+ assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08());
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testLegacyHeartbeat() throws Exception
+ {
+ String brokerURL = "tcp://localhost:5672?idle_timeout='60000'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+ assertEquals(60000, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
+
+ assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08());
+ }
}