summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-01-21 02:45:35 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-01-21 02:45:35 +0000
commitbc9a4cde3a75ce8693649873be9534073b4aeb58 (patch)
treeb9ff57313a43fc7e2f5b43ba61655401740e6f2a /java/client/src
parent50c10da7db2c90a4db82c5eeeecb49fa0a3f5015 (diff)
downloadqpid-python-bc9a4cde3a75ce8693649873be9534073b4aeb58.tar.gz
The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352
- Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352. - Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352. - Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged. - Modified the ClientDelegate to handle heartbeat and idelTimeout value properly. - Added support to specify config options via the connection URL - QPID-2351 - Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java114
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java3
8 files changed, 95 insertions, 48 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 414638dea4..6b5673509e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -255,11 +255,11 @@ public class AMQBrokerDetails implements BrokerDetails
return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
}
- public boolean useSSL()
+ public boolean getBooleanProperty(String propName)
{
- if (_options.containsKey(ConnectionURL.OPTIONS_SSL))
+ if (_options.containsKey(propName))
{
- return Boolean.parseBoolean(_options.get(ConnectionURL.OPTIONS_SSL));
+ return Boolean.parseBoolean(_options.get(propName));
}
return false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 125cb6cae3..cdf1167185 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -381,10 +381,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
useSSL
? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 23dc244dee..5f93ec6c47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -58,8 +58,6 @@ public interface AMQConnectionDelegate
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
- void setIdleTimeout(long l);
-
int getMaxChannelID();
ProtocolVersion getProtocolVersion();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index a4c6263435..0d1a89a6c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -41,6 +41,7 @@ import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn = conn;
_qpidConnection = new Connection();
- _qpidConnection.setConnectionListener(this);
+ _qpidConnection.addConnectionListener(this);
}
/**
@@ -149,40 +150,64 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
if (_logger.isDebugEnabled())
{
- _logger.debug("connecting to host: " + brokerDetail.getHost() +
- " port: " + brokerDetail.getPort() +
- " vhost: " + _conn.getVirtualHost() +
- " username: " + _conn.getUsername() +
- " password: " + _conn.getPassword());
+ _logger.debug("connecting to host: " + brokerDetail.getHost()
+ + " port: " + brokerDetail.getPort() + " vhost: "
+ + _conn.getVirtualHost() + " username: "
+ + _conn.getUsername() + " password: "
+ + _conn.getPassword());
}
-
- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
- {
- this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
- }
- else
- {
- // use the default value set for all connections
- this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,ClientProperties.DEFAULT_IDLE_TIMEOUT));
- }
-
- String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
- brokerDetail.getProperty("sasl_mechs"):
- System.getProperty("qpid.sasl_mechs","PLAIN");
-
- _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
- _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
+
+ String saslMechs = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS):
+ System.getProperty("qpid.sasl_mechs", "PLAIN");
+
+ // Sun SASL Kerberos client uses the
+ // protocol + servername as the service key.
+ String protocol = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME):
+ System.getProperty("qpid.sasl_protocol", "AMQP");
+
+ String saslServerName = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME):
+ System.getProperty("qpid.sasl_server_name", "localhost");
+
+ boolean useSSL = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION);
+
+ boolean useSASLEncryption = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION)?
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION):
+ Boolean.getBoolean("qpid.sasl_encryption");
+
+ boolean useTcpNodelay = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)?
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY):
+ Boolean.getBoolean("amqj.tcp_nodelay");
+
+
+ ConnectionSettings conSettings = new ConnectionSettings();
+ conSettings.setHost(brokerDetail.getHost());
+ conSettings.setPort(brokerDetail.getPort());
+ conSettings.setVhost(_conn.getVirtualHost());
+ conSettings.setUsername(_conn.getUsername());
+ conSettings.setPassword(_conn.getPassword());
+ conSettings.setUseSASLEncryption(useSASLEncryption);
+ conSettings.setUseSSL(useSSL);
+ conSettings.setSaslMechs(saslMechs);
+ conSettings.setTcpNodelay(useTcpNodelay);
+ conSettings.setSaslProtocol(protocol);
+ conSettings.setSaslServerName(saslServerName);
+ conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+
+ _qpidConnection.connect(conSettings);
+
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
_conn._failoverPolicy.attainedConnection();
- }
- catch(ProtocolVersionException pe)
+ } catch (ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
- }
- catch (ConnectionException e)
- {
- throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
+ } catch (ConnectionException e)
+ {
+ throw new AMQException(AMQConstant.CHANNEL_ERROR,
+ "cannot connect to broker", e);
}
return null;
@@ -293,11 +318,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
- public void setIdleTimeout(long l)
- {
- _qpidConnection.setIdleTimeout((int)l);
- }
-
public int getMaxChannelID()
{
return Integer.MAX_VALUE;
@@ -307,4 +327,30 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return ProtocolVersion.v0_10;
}
+
+ // The idle_timeout prop is in milisecs while
+ // the new heartbeat prop is in secs
+ private int getHeartbeatInterval(BrokerDetails brokerDetail)
+ {
+ int heartbeat = 0;
+ if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>");
+ heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000;
+ }
+ else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
+ {
+ heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+ else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
+ {
+ heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
+ _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
+ }
+ else
+ {
+ heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
+ }
+ return 0;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 6f44f68b37..a8fcdf561c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -304,8 +304,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
- public void setIdleTimeout(long l){}
-
public int getMaxChannelID()
{
return (int) (Math.pow(2, 16)-1);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index f8012d044a..c16941b341 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -204,7 +204,7 @@ public class AMQProtocolHandler implements ProtocolEngine
IoTransport.connect_0_9(getProtocolSession(),
brokerDetail.getHost(),
brokerDetail.getPort(),
- brokerDetail.useSSL());
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
_protocolSession.init();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 7227ab247c..c09472fcad 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -34,8 +34,14 @@ public interface BrokerDetails
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
- public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated
+ public static final String OPTIONS_HEARTBEAT = "heartbeat";
public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
+ public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
+ public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay";
+ public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol";
+ public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server";
public static final int DEFAULT_PORT = 5672;
public static final String SOCKET = "socket";
@@ -97,7 +103,7 @@ public interface BrokerDetails
void setSSLConfiguration(SSLConfiguration sslConfiguration);
- boolean useSSL();
+ boolean getBooleanProperty(String propName);
String toString();
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 03ab967c36..6590002bf1 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -40,8 +40,7 @@ public interface ConnectionURL
public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
- public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";