From bc9a4cde3a75ce8693649873be9534073b4aeb58 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 21 Jan 2010 02:45:35 +0000 Subject: The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352 - Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352. - Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352. - Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged. - Modified the ClientDelegate to handle heartbeat and idelTimeout value properly. - Added support to specify config options via the connection URL - QPID-2351 - Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901506 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQBrokerDetails.java | 6 +- .../java/org/apache/qpid/client/AMQConnection.java | 4 +- .../apache/qpid/client/AMQConnectionDelegate.java | 2 - .../qpid/client/AMQConnectionDelegate_0_10.java | 114 +++++++++++++++------ .../qpid/client/AMQConnectionDelegate_8_0.java | 2 - .../qpid/client/protocol/AMQProtocolHandler.java | 2 +- .../java/org/apache/qpid/jms/BrokerDetails.java | 10 +- .../java/org/apache/qpid/jms/ConnectionURL.java | 3 +- 8 files changed, 95 insertions(+), 48 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 414638dea4..6b5673509e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -255,11 +255,11 @@ public class AMQBrokerDetails implements BrokerDetails return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; } - public boolean useSSL() + public boolean getBooleanProperty(String propName) { - if (_options.containsKey(ConnectionURL.OPTIONS_SSL)) + if (_options.containsKey(propName)) { - return Boolean.parseBoolean(_options.get(ConnectionURL.OPTIONS_SSL)); + return Boolean.parseBoolean(_options.get(propName)); } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 125cb6cae3..cdf1167185 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -381,10 +381,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect useSSL ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") + + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); + + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 23dc244dee..5f93ec6c47 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -58,8 +58,6 @@ public interface AMQConnectionDelegate T executeRetrySupport(FailoverProtectedOperation operation) throws E; - void setIdleTimeout(long l); - int getMaxChannelID(); ProtocolVersion getProtocolVersion(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a4c6263435..0d1a89a6c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -41,6 +41,7 @@ import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.ProtocolVersionException; import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; @@ -69,7 +70,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn = conn; _qpidConnection = new Connection(); - _qpidConnection.setConnectionListener(this); + _qpidConnection.addConnectionListener(this); } /** @@ -149,40 +150,64 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { if (_logger.isDebugEnabled()) { - _logger.debug("connecting to host: " + brokerDetail.getHost() + - " port: " + brokerDetail.getPort() + - " vhost: " + _conn.getVirtualHost() + - " username: " + _conn.getUsername() + - " password: " + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + " vhost: " + + _conn.getVirtualHost() + " username: " + + _conn.getUsername() + " password: " + + _conn.getPassword()); } - - if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) - { - this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); - } - else - { - // use the default value set for all connections - this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,ClientProperties.DEFAULT_IDLE_TIMEOUT)); - } - - String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null? - brokerDetail.getProperty("sasl_mechs"): - System.getProperty("qpid.sasl_mechs","PLAIN"); - - _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), - _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs); + + String saslMechs = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null ? + brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS): + System.getProperty("qpid.sasl_mechs", "PLAIN"); + + // Sun SASL Kerberos client uses the + // protocol + servername as the service key. + String protocol = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME) != null ? + brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME): + System.getProperty("qpid.sasl_protocol", "AMQP"); + + String saslServerName = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME) != null ? + brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME): + System.getProperty("qpid.sasl_server_name", "localhost"); + + boolean useSSL = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION); + + boolean useSASLEncryption = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION)? + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION): + Boolean.getBoolean("qpid.sasl_encryption"); + + boolean useTcpNodelay = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)? + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY): + Boolean.getBoolean("amqj.tcp_nodelay"); + + + ConnectionSettings conSettings = new ConnectionSettings(); + conSettings.setHost(brokerDetail.getHost()); + conSettings.setPort(brokerDetail.getPort()); + conSettings.setVhost(_conn.getVirtualHost()); + conSettings.setUsername(_conn.getUsername()); + conSettings.setPassword(_conn.getPassword()); + conSettings.setUseSASLEncryption(useSASLEncryption); + conSettings.setUseSSL(useSSL); + conSettings.setSaslMechs(saslMechs); + conSettings.setTcpNodelay(useTcpNodelay); + conSettings.setSaslProtocol(protocol); + conSettings.setSaslServerName(saslServerName); + conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); + + _qpidConnection.connect(conSettings); + _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); _conn._failoverPolicy.attainedConnection(); - } - catch(ProtocolVersionException pe) + } catch (ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); - } - catch (ConnectionException e) - { - throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); + } catch (ConnectionException e) + { + throw new AMQException(AMQConstant.CHANNEL_ERROR, + "cannot connect to broker", e); } return null; @@ -293,11 +318,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } - public void setIdleTimeout(long l) - { - _qpidConnection.setIdleTimeout((int)l); - } - public int getMaxChannelID() { return Integer.MAX_VALUE; @@ -307,4 +327,30 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return ProtocolVersion.v0_10; } + + // The idle_timeout prop is in milisecs while + // the new heartbeat prop is in secs + private int getHeartbeatInterval(BrokerDetails brokerDetail) + { + int heartbeat = 0; + if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + _logger.warn("Broker property idle_timeout= is deprecated, please use heartbeat="); + heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000; + } + else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) + { + heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) + { + heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; + _logger.warn("JVM arg -Didle_timeout= is deprecated, please use -Dqpid.heartbeat="); + } + else + { + heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); + } + return 0; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 6f44f68b37..a8fcdf561c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -304,8 +304,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void setIdleTimeout(long l){} - public int getMaxChannelID() { return (int) (Math.pow(2, 16)-1); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index f8012d044a..c16941b341 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -204,7 +204,7 @@ public class AMQProtocolHandler implements ProtocolEngine IoTransport.connect_0_9(getProtocolSession(), brokerDetail.getHost(), brokerDetail.getPort(), - brokerDetail.useSSL()); + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 7227ab247c..c09472fcad 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,8 +34,14 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; - public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated + public static final String OPTIONS_HEARTBEAT = "heartbeat"; public static final String OPTIONS_SASL_MECHS = "sasl_mechs"; + public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption"; + public static final String OPTIONS_SSL = "ssl"; + public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay"; + public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol"; + public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server"; public static final int DEFAULT_PORT = 5672; public static final String SOCKET = "socket"; @@ -97,7 +103,7 @@ public interface BrokerDetails void setSSLConfiguration(SSLConfiguration sslConfiguration); - boolean useSSL(); + boolean getBooleanProperty(String propName); String toString(); diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 03ab967c36..6590002bf1 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -40,8 +40,7 @@ public interface ConnectionURL public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; - public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; - public static final String OPTIONS_SSL = "ssl"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; -- cgit v1.2.1