summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-01-21 02:45:35 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-01-21 02:45:35 +0000
commitbc9a4cde3a75ce8693649873be9534073b4aeb58 (patch)
treeb9ff57313a43fc7e2f5b43ba61655401740e6f2a /java
parent50c10da7db2c90a4db82c5eeeecb49fa0a3f5015 (diff)
downloadqpid-python-bc9a4cde3a75ce8693649873be9534073b4aeb58.tar.gz
The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352
- Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352. - Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352. - Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged. - Modified the ClientDelegate to handle heartbeat and idelTimeout value properly. - Added support to specify config options via the connection URL - QPID-2351 - Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java114
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java41
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java62
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java190
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java2
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java6
13 files changed, 351 insertions, 93 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 414638dea4..6b5673509e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -255,11 +255,11 @@ public class AMQBrokerDetails implements BrokerDetails
return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
}
- public boolean useSSL()
+ public boolean getBooleanProperty(String propName)
{
- if (_options.containsKey(ConnectionURL.OPTIONS_SSL))
+ if (_options.containsKey(propName))
{
- return Boolean.parseBoolean(_options.get(ConnectionURL.OPTIONS_SSL));
+ return Boolean.parseBoolean(_options.get(propName));
}
return false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 125cb6cae3..cdf1167185 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -381,10 +381,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
useSSL
? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 23dc244dee..5f93ec6c47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -58,8 +58,6 @@ public interface AMQConnectionDelegate
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
- void setIdleTimeout(long l);
-
int getMaxChannelID();
ProtocolVersion getProtocolVersion();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index a4c6263435..0d1a89a6c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -41,6 +41,7 @@ import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn = conn;
_qpidConnection = new Connection();
- _qpidConnection.setConnectionListener(this);
+ _qpidConnection.addConnectionListener(this);
}
/**
@@ -149,40 +150,64 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
if (_logger.isDebugEnabled())
{
- _logger.debug("connecting to host: " + brokerDetail.getHost() +
- " port: " + brokerDetail.getPort() +
- " vhost: " + _conn.getVirtualHost() +
- " username: " + _conn.getUsername() +
- " password: " + _conn.getPassword());
+ _logger.debug("connecting to host: " + brokerDetail.getHost()
+ + " port: " + brokerDetail.getPort() + " vhost: "
+ + _conn.getVirtualHost() + " username: "
+ + _conn.getUsername() + " password: "
+ + _conn.getPassword());
}
-
- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
- {
- this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
- }
- else
- {
- // use the default value set for all connections
- this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,ClientProperties.DEFAULT_IDLE_TIMEOUT));
- }
-
- String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
- brokerDetail.getProperty("sasl_mechs"):
- System.getProperty("qpid.sasl_mechs","PLAIN");
-
- _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
- _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
+
+ String saslMechs = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS):
+ System.getProperty("qpid.sasl_mechs", "PLAIN");
+
+ // Sun SASL Kerberos client uses the
+ // protocol + servername as the service key.
+ String protocol = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME):
+ System.getProperty("qpid.sasl_protocol", "AMQP");
+
+ String saslServerName = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME):
+ System.getProperty("qpid.sasl_server_name", "localhost");
+
+ boolean useSSL = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION);
+
+ boolean useSASLEncryption = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION)?
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION):
+ Boolean.getBoolean("qpid.sasl_encryption");
+
+ boolean useTcpNodelay = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)?
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY):
+ Boolean.getBoolean("amqj.tcp_nodelay");
+
+
+ ConnectionSettings conSettings = new ConnectionSettings();
+ conSettings.setHost(brokerDetail.getHost());
+ conSettings.setPort(brokerDetail.getPort());
+ conSettings.setVhost(_conn.getVirtualHost());
+ conSettings.setUsername(_conn.getUsername());
+ conSettings.setPassword(_conn.getPassword());
+ conSettings.setUseSASLEncryption(useSASLEncryption);
+ conSettings.setUseSSL(useSSL);
+ conSettings.setSaslMechs(saslMechs);
+ conSettings.setTcpNodelay(useTcpNodelay);
+ conSettings.setSaslProtocol(protocol);
+ conSettings.setSaslServerName(saslServerName);
+ conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+
+ _qpidConnection.connect(conSettings);
+
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
_conn._failoverPolicy.attainedConnection();
- }
- catch(ProtocolVersionException pe)
+ } catch (ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
- }
- catch (ConnectionException e)
- {
- throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
+ } catch (ConnectionException e)
+ {
+ throw new AMQException(AMQConstant.CHANNEL_ERROR,
+ "cannot connect to broker", e);
}
return null;
@@ -293,11 +318,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
- public void setIdleTimeout(long l)
- {
- _qpidConnection.setIdleTimeout((int)l);
- }
-
public int getMaxChannelID()
{
return Integer.MAX_VALUE;
@@ -307,4 +327,30 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return ProtocolVersion.v0_10;
}
+
+ // The idle_timeout prop is in milisecs while
+ // the new heartbeat prop is in secs
+ private int getHeartbeatInterval(BrokerDetails brokerDetail)
+ {
+ int heartbeat = 0;
+ if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>");
+ heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000;
+ }
+ else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
+ {
+ heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+ else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
+ {
+ heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
+ _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
+ }
+ else
+ {
+ heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
+ }
+ return 0;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 6f44f68b37..a8fcdf561c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -304,8 +304,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
- public void setIdleTimeout(long l){}
-
public int getMaxChannelID()
{
return (int) (Math.pow(2, 16)-1);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index f8012d044a..c16941b341 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -204,7 +204,7 @@ public class AMQProtocolHandler implements ProtocolEngine
IoTransport.connect_0_9(getProtocolSession(),
brokerDetail.getHost(),
brokerDetail.getPort(),
- brokerDetail.useSSL());
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
_protocolSession.init();
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 7227ab247c..c09472fcad 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -34,8 +34,14 @@ public interface BrokerDetails
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
- public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated
+ public static final String OPTIONS_HEARTBEAT = "heartbeat";
public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
+ public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
+ public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay";
+ public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol";
+ public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server";
public static final int DEFAULT_PORT = 5672;
public static final String SOCKET = "socket";
@@ -97,7 +103,7 @@ public interface BrokerDetails
void setSSLConfiguration(SSLConfiguration sslConfiguration);
- boolean useSSL();
+ boolean getBooleanProperty(String propName);
String toString();
diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
index 03ab967c36..6590002bf1 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
@@ -40,8 +40,7 @@ public interface ConnectionURL
public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
- public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 4200a8352c..b12fbb75e6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -61,24 +61,13 @@ public class ClientDelegate extends ConnectionDelegate
} catch (GSSException ignore) {}
}
- private String vhost;
- private String username;
- private String password;
private List<String> clientMechs;
- private String protocol;
- private String serverName;
+ private ConnectionSettings conSettings;
- public ClientDelegate(String vhost, String username, String password,String saslMechs)
+ public ClientDelegate(ConnectionSettings settings)
{
- this.vhost = vhost;
- this.username = username;
- this.password = password;
- this.clientMechs = Arrays.asList(saslMechs.split(" "));
-
- // Looks kinda of silly but the Sun SASL Kerberos client uses the
- // protocol + servername as the service key.
- this.protocol = System.getProperty("qpid.sasl_protocol","AMQP");
- this.serverName = System.getProperty("qpid.sasl_server_name","localhost");
+ this.conSettings = settings;
+ this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -128,11 +117,16 @@ public class ClientDelegate extends ConnectionDelegate
try
{
+ Map<String,Object> saslProps = new HashMap<String,Object>();
+ if (conSettings.isUseSASLEncryption())
+ {
+ saslProps.put(Sasl.QOP, "auth-conf");
+ }
UsernamePasswordCallbackHandler handler =
new UsernamePasswordCallbackHandler();
- handler.initialise(username, password);
+ handler.initialise(conSettings.getUsername(), conSettings.getPassword());
SaslClient sc = Sasl.createSaslClient
- (mechs, null, protocol, serverName, null, handler);
+ (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
conn.setSaslClient(sc);
byte[] response = sc.hasInitialResponse() ?
@@ -164,15 +158,16 @@ public class ClientDelegate extends ConnectionDelegate
@Override public void connectionTune(Connection conn, ConnectionTune tune)
{
conn.setChannelMax(tune.getChannelMax());
- int hb_interval = calculateHeartbeatInterval(conn,
+ int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
conn.connectionTuneOk(tune.getChannelMax(),
tune.getMaxFrameSize(),
hb_interval);
- conn.setIdleTimeout(hb_interval*1000);
- conn.connectionOpen(vhost, null, Option.INSIST);
+ // The idle timeout is twice the heartbeat amount (in milisecs)
+ conn.setIdleTimeout(hb_interval*1000*2);
+ conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
}
@Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
@@ -198,9 +193,9 @@ public class ClientDelegate extends ConnectionDelegate
/**
* Currently the spec specified the min and max for heartbeat using secs
*/
- private int calculateHeartbeatInterval(Connection conn,int min, int max)
+ private int calculateHeartbeatInterval(int heartbeat,int min, int max)
{
- int i = conn.getIdleTimeout()/1000;
+ int i = heartbeat;
if (i == 0)
{
log.warn("Idle timeout is zero. Heartbeats are disabled");
@@ -245,7 +240,7 @@ public class ClientDelegate extends ConnectionDelegate
private String getUserID()
{
log.debug("Obtaining userID from kerberos");
- String service = protocol + "@" + serverName;
+ String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
GSSManager manager = GSSManager.getInstance();
try
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 9f1916e1d1..17a13561c8 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -76,7 +76,7 @@ public class Connection extends ConnectionInvoker
private State state = NEW;
final private Object lock = new Object();
private long timeout = 60000;
- private ConnectionListener listener = new DefaultConnectionListener();
+ private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
private ConnectionException error = null;
private int channelMax = 1;
@@ -86,7 +86,8 @@ public class Connection extends ConnectionInvoker
private int idleTimeout = 0;
private String _authorizationID;
private String userID;
-
+ private ConnectionSettings conSettings;
+
// want to make this final
private int _connectionId;
@@ -97,16 +98,9 @@ public class Connection extends ConnectionInvoker
this.delegate = delegate;
}
- public void setConnectionListener(ConnectionListener listener)
+ public void addConnectionListener(ConnectionListener listener)
{
- if (listener == null)
- {
- this.listener = new DefaultConnectionListener();
- }
- else
- {
- this.listener = listener;
- }
+ listeners.add(listener);
}
public Sender<ProtocolEvent> getSender()
@@ -154,7 +148,7 @@ public class Connection extends ConnectionInvoker
this.saslClient = saslClient;
}
- SaslClient getSaslClient()
+ public SaslClient getSaslClient()
{
return saslClient;
}
@@ -171,13 +165,30 @@ public class Connection extends ConnectionInvoker
public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
{
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setHost(host);
+ settings.setPort(port);
+ settings.setVhost(vhost);
+ settings.setUsername(username);
+ settings.setPassword(password);
+ settings.setUseSSL(ssl);
+ settings.setSaslMechs(saslMechs);
+ connect(settings);
+ }
+
+ public void connect(ConnectionSettings settings)
+ {
synchronized (lock)
{
+ conSettings = settings;
state = OPENING;
- userID = username;
- delegate = new ClientDelegate(vhost, username, password,saslMechs);
+ userID = settings.getUsername();
+ delegate = new ClientDelegate(settings);
- IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
+ IoTransport.connect(settings.getHost(),
+ settings.getPort(),
+ ConnectionBinding.get(this),
+ settings.isUseSSL());
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -218,7 +229,10 @@ public class Connection extends ConnectionInvoker
}
}
- listener.opened(this);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.opened(this);
+ }
}
public Session createSession()
@@ -407,7 +421,11 @@ public class Connection extends ConnectionInvoker
}
}
- listener.exception(this, e);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.exception(this, e);
+ }
+
}
public void exception(Throwable t)
@@ -460,7 +478,10 @@ public class Connection extends ConnectionInvoker
setState(CLOSED);
}
- listener.closed(this);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.closed(this);
+ }
}
public void close()
@@ -560,5 +581,10 @@ public class Connection extends ConnectionInvoker
{
return String.format("conn:%x", System.identityHashCode(this));
}
+
+ public ConnectionSettings getConnectionSettings()
+ {
+ return conSettings;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
new file mode 100644
index 0000000000..b25c2d7fd0
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public class ConnectionSettings
+{
+ String protocol = "tcp";
+ String host = "localhost";
+ String vhost;
+ String username = "guest";
+ String password = "guest";
+ String saslMechs = "PLAIN";
+ String saslProtocol = "AMQP";
+ String saslServerName = "localhost";
+ int port = 5672;
+ int maxChannelCount = 32767;
+ int maxFrameSize = 65535;
+ int heartbeatInterval;
+ boolean useSSL;
+ boolean useSASLEncryption;
+ boolean tcpNodelay;
+
+ public boolean isTcpNodelay()
+ {
+ return tcpNodelay;
+ }
+
+ public void setTcpNodelay(boolean tcpNodelay)
+ {
+ this.tcpNodelay = tcpNodelay;
+ }
+
+ public int getHeartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
+
+ public void setHeartbeatInterval(int heartbeatInterval)
+ {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ public String getProtocol()
+ {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol)
+ {
+ this.protocol = protocol;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ public String getVhost()
+ {
+ return vhost;
+ }
+
+ public void setVhost(String vhost)
+ {
+ this.vhost = vhost;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public void setUsername(String username)
+ {
+ this.username = username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ public void setPassword(String password)
+ {
+ this.password = password;
+ }
+
+ public boolean isUseSSL()
+ {
+ return useSSL;
+ }
+
+ public void setUseSSL(boolean useSSL)
+ {
+ this.useSSL = useSSL;
+ }
+
+ public boolean isUseSASLEncryption()
+ {
+ return useSASLEncryption;
+ }
+
+ public void setUseSASLEncryption(boolean useSASLEncryption)
+ {
+ this.useSASLEncryption = useSASLEncryption;
+ }
+
+ public String getSaslMechs()
+ {
+ return saslMechs;
+ }
+
+ public void setSaslMechs(String saslMechs)
+ {
+ this.saslMechs = saslMechs;
+ }
+
+ public String getSaslProtocol()
+ {
+ return saslProtocol;
+ }
+
+ public void setSaslProtocol(String saslProtocol)
+ {
+ this.saslProtocol = saslProtocol;
+ }
+
+ public String getSaslServerName()
+ {
+ return saslServerName;
+ }
+
+ public void setSaslServerName(String saslServerName)
+ {
+ this.saslServerName = saslServerName;
+ }
+
+ public int getMaxChannelCount()
+ {
+ return maxChannelCount;
+ }
+
+ public void setMaxChannelCount(int maxChannelCount)
+ {
+ this.maxChannelCount = maxChannelCount;
+ }
+
+ public int getMaxFrameSize()
+ {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize)
+ {
+ this.maxFrameSize = maxFrameSize;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 30d7e52d33..383fd6131a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -297,7 +297,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
try
{
- socket.setSoTimeout(i*2);
+ socket.setSoTimeout(i);
}
catch (Exception e)
{
diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 8aa8c5f647..6554b135f5 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -160,7 +160,7 @@ public class ConnectionTest extends TestCase implements SessionListener
private Connection connect(final Condition closed)
{
Connection conn = new Connection();
- conn.setConnectionListener(new ConnectionListener()
+ conn.addConnectionListener(new ConnectionListener()
{
public void opened(Connection conn) {}
public void exception(Connection conn, ConnectionException exc)
@@ -311,7 +311,7 @@ public class ConnectionTest extends TestCase implements SessionListener
startServer();
Connection conn = new Connection();
- conn.setConnectionListener(new FailoverConnectionListener());
+ conn.addConnectionListener(new FailoverConnectionListener());
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession(1);
ssn.setSessionListener(new TestSessionListener());
@@ -366,7 +366,7 @@ public class ConnectionTest extends TestCase implements SessionListener
startServer();
Connection conn = new Connection();
- conn.setConnectionListener(new FailoverConnectionListener());
+ conn.addConnectionListener(new FailoverConnectionListener());
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession(1);
ssn.setSessionListener(new TestSessionListener());