diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-01-21 02:45:35 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-01-21 02:45:35 +0000 |
| commit | bc9a4cde3a75ce8693649873be9534073b4aeb58 (patch) | |
| tree | b9ff57313a43fc7e2f5b43ba61655401740e6f2a /java | |
| parent | 50c10da7db2c90a4db82c5eeeecb49fa0a3f5015 (diff) | |
| download | qpid-python-bc9a4cde3a75ce8693649873be9534073b4aeb58.tar.gz | |
The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352
- Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352.
- Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352.
- Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged.
- Modified the ClientDelegate to handle heartbeat and idelTimeout value properly.
- Added support to specify config options via the connection URL - QPID-2351
- Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901506 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
13 files changed, 351 insertions, 93 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 414638dea4..6b5673509e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -255,11 +255,11 @@ public class AMQBrokerDetails implements BrokerDetails return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; } - public boolean useSSL() + public boolean getBooleanProperty(String propName) { - if (_options.containsKey(ConnectionURL.OPTIONS_SSL)) + if (_options.containsKey(propName)) { - return Boolean.parseBoolean(_options.get(ConnectionURL.OPTIONS_SSL)); + return Boolean.parseBoolean(_options.get(propName)); } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 125cb6cae3..cdf1167185 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -381,10 +381,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect useSSL ? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'") + + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'") : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port - + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig); + + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig); } public AMQConnection(String connection) throws AMQException, URLSyntaxException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 23dc244dee..5f93ec6c47 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -58,8 +58,6 @@ public interface AMQConnectionDelegate <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - void setIdleTimeout(long l); - int getMaxChannelID(); ProtocolVersion getProtocolVersion(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a4c6263435..0d1a89a6c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -41,6 +41,7 @@ import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.ProtocolVersionException; import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; @@ -69,7 +70,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn = conn; _qpidConnection = new Connection(); - _qpidConnection.setConnectionListener(this); + _qpidConnection.addConnectionListener(this); } /** @@ -149,40 +150,64 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { if (_logger.isDebugEnabled()) { - _logger.debug("connecting to host: " + brokerDetail.getHost() + - " port: " + brokerDetail.getPort() + - " vhost: " + _conn.getVirtualHost() + - " username: " + _conn.getUsername() + - " password: " + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + " vhost: " + + _conn.getVirtualHost() + " username: " + + _conn.getUsername() + " password: " + + _conn.getPassword()); } - - if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) - { - this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); - } - else - { - // use the default value set for all connections - this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,ClientProperties.DEFAULT_IDLE_TIMEOUT)); - } - - String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null? - brokerDetail.getProperty("sasl_mechs"): - System.getProperty("qpid.sasl_mechs","PLAIN"); - - _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), - _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs); + + String saslMechs = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null ? + brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS): + System.getProperty("qpid.sasl_mechs", "PLAIN"); + + // Sun SASL Kerberos client uses the + // protocol + servername as the service key. + String protocol = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME) != null ? + brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME): + System.getProperty("qpid.sasl_protocol", "AMQP"); + + String saslServerName = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME) != null ? + brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME): + System.getProperty("qpid.sasl_server_name", "localhost"); + + boolean useSSL = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION); + + boolean useSASLEncryption = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION)? + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION): + Boolean.getBoolean("qpid.sasl_encryption"); + + boolean useTcpNodelay = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)? + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY): + Boolean.getBoolean("amqj.tcp_nodelay"); + + + ConnectionSettings conSettings = new ConnectionSettings(); + conSettings.setHost(brokerDetail.getHost()); + conSettings.setPort(brokerDetail.getPort()); + conSettings.setVhost(_conn.getVirtualHost()); + conSettings.setUsername(_conn.getUsername()); + conSettings.setPassword(_conn.getPassword()); + conSettings.setUseSASLEncryption(useSASLEncryption); + conSettings.setUseSSL(useSSL); + conSettings.setSaslMechs(saslMechs); + conSettings.setTcpNodelay(useTcpNodelay); + conSettings.setSaslProtocol(protocol); + conSettings.setSaslServerName(saslServerName); + conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail)); + + _qpidConnection.connect(conSettings); + _conn._connected = true; _conn.setUsername(_qpidConnection.getUserID()); _conn._failoverPolicy.attainedConnection(); - } - catch(ProtocolVersionException pe) + } catch (ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); - } - catch (ConnectionException e) - { - throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); + } catch (ConnectionException e) + { + throw new AMQException(AMQConstant.CHANNEL_ERROR, + "cannot connect to broker", e); } return null; @@ -293,11 +318,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } - public void setIdleTimeout(long l) - { - _qpidConnection.setIdleTimeout((int)l); - } - public int getMaxChannelID() { return Integer.MAX_VALUE; @@ -307,4 +327,30 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return ProtocolVersion.v0_10; } + + // The idle_timeout prop is in milisecs while + // the new heartbeat prop is in secs + private int getHeartbeatInterval(BrokerDetails brokerDetail) + { + int heartbeat = 0; + if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>"); + heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000; + } + else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null) + { + heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)); + } + else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null) + { + heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; + _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>"); + } + else + { + heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); + } + return 0; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 6f44f68b37..a8fcdf561c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -304,8 +304,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void setIdleTimeout(long l){} - public int getMaxChannelID() { return (int) (Math.pow(2, 16)-1); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index f8012d044a..c16941b341 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -204,7 +204,7 @@ public class AMQProtocolHandler implements ProtocolEngine IoTransport.connect_0_9(getProtocolSession(), brokerDetail.getHost(), brokerDetail.getPort(), - brokerDetail.useSSL()); + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 7227ab247c..c09472fcad 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,8 +34,14 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; - public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated + public static final String OPTIONS_HEARTBEAT = "heartbeat"; public static final String OPTIONS_SASL_MECHS = "sasl_mechs"; + public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption"; + public static final String OPTIONS_SSL = "ssl"; + public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay"; + public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol"; + public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server"; public static final int DEFAULT_PORT = 5672; public static final String SOCKET = "socket"; @@ -97,7 +103,7 @@ public interface BrokerDetails void setSSLConfiguration(SSLConfiguration sslConfiguration); - boolean useSSL(); + boolean getBooleanProperty(String propName); String toString(); diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 03ab967c36..6590002bf1 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -40,8 +40,7 @@ public interface ConnectionURL public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; - public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; - public static final String OPTIONS_SSL = "ssl"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange"; public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 4200a8352c..b12fbb75e6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -61,24 +61,13 @@ public class ClientDelegate extends ConnectionDelegate } catch (GSSException ignore) {} } - private String vhost; - private String username; - private String password; private List<String> clientMechs; - private String protocol; - private String serverName; + private ConnectionSettings conSettings; - public ClientDelegate(String vhost, String username, String password,String saslMechs) + public ClientDelegate(ConnectionSettings settings) { - this.vhost = vhost; - this.username = username; - this.password = password; - this.clientMechs = Arrays.asList(saslMechs.split(" ")); - - // Looks kinda of silly but the Sun SASL Kerberos client uses the - // protocol + servername as the service key. - this.protocol = System.getProperty("qpid.sasl_protocol","AMQP"); - this.serverName = System.getProperty("qpid.sasl_server_name","localhost"); + this.conSettings = settings; + this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" ")); } public void init(Connection conn, ProtocolHeader hdr) @@ -128,11 +117,16 @@ public class ClientDelegate extends ConnectionDelegate try { + Map<String,Object> saslProps = new HashMap<String,Object>(); + if (conSettings.isUseSASLEncryption()) + { + saslProps.put(Sasl.QOP, "auth-conf"); + } UsernamePasswordCallbackHandler handler = new UsernamePasswordCallbackHandler(); - handler.initialise(username, password); + handler.initialise(conSettings.getUsername(), conSettings.getPassword()); SaslClient sc = Sasl.createSaslClient - (mechs, null, protocol, serverName, null, handler); + (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler); conn.setSaslClient(sc); byte[] response = sc.hasInitialResponse() ? @@ -164,15 +158,16 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { conn.setChannelMax(tune.getChannelMax()); - int hb_interval = calculateHeartbeatInterval(conn, + int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() ); conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), hb_interval); - conn.setIdleTimeout(hb_interval*1000); - conn.connectionOpen(vhost, null, Option.INSIST); + // The idle timeout is twice the heartbeat amount (in milisecs) + conn.setIdleTimeout(hb_interval*1000*2); + conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); } @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) @@ -198,9 +193,9 @@ public class ClientDelegate extends ConnectionDelegate /** * Currently the spec specified the min and max for heartbeat using secs */ - private int calculateHeartbeatInterval(Connection conn,int min, int max) + private int calculateHeartbeatInterval(int heartbeat,int min, int max) { - int i = conn.getIdleTimeout()/1000; + int i = heartbeat; if (i == 0) { log.warn("Idle timeout is zero. Heartbeats are disabled"); @@ -245,7 +240,7 @@ public class ClientDelegate extends ConnectionDelegate private String getUserID() { log.debug("Obtaining userID from kerberos"); - String service = protocol + "@" + serverName; + String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName(); GSSManager manager = GSSManager.getInstance(); try diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 9f1916e1d1..17a13561c8 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -76,7 +76,7 @@ public class Connection extends ConnectionInvoker private State state = NEW; final private Object lock = new Object(); private long timeout = 60000; - private ConnectionListener listener = new DefaultConnectionListener(); + private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>(); private ConnectionException error = null; private int channelMax = 1; @@ -86,7 +86,8 @@ public class Connection extends ConnectionInvoker private int idleTimeout = 0; private String _authorizationID; private String userID; - + private ConnectionSettings conSettings; + // want to make this final private int _connectionId; @@ -97,16 +98,9 @@ public class Connection extends ConnectionInvoker this.delegate = delegate; } - public void setConnectionListener(ConnectionListener listener) + public void addConnectionListener(ConnectionListener listener) { - if (listener == null) - { - this.listener = new DefaultConnectionListener(); - } - else - { - this.listener = listener; - } + listeners.add(listener); } public Sender<ProtocolEvent> getSender() @@ -154,7 +148,7 @@ public class Connection extends ConnectionInvoker this.saslClient = saslClient; } - SaslClient getSaslClient() + public SaslClient getSaslClient() { return saslClient; } @@ -171,13 +165,30 @@ public class Connection extends ConnectionInvoker public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) { + ConnectionSettings settings = new ConnectionSettings(); + settings.setHost(host); + settings.setPort(port); + settings.setVhost(vhost); + settings.setUsername(username); + settings.setPassword(password); + settings.setUseSSL(ssl); + settings.setSaslMechs(saslMechs); + connect(settings); + } + + public void connect(ConnectionSettings settings) + { synchronized (lock) { + conSettings = settings; state = OPENING; - userID = username; - delegate = new ClientDelegate(vhost, username, password,saslMechs); + userID = settings.getUsername(); + delegate = new ClientDelegate(settings); - IoTransport.connect(host, port, ConnectionBinding.get(this), ssl); + IoTransport.connect(settings.getHost(), + settings.getPort(), + ConnectionBinding.get(this), + settings.isUseSSL()); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -218,7 +229,10 @@ public class Connection extends ConnectionInvoker } } - listener.opened(this); + for (ConnectionListener listener: listeners) + { + listener.opened(this); + } } public Session createSession() @@ -407,7 +421,11 @@ public class Connection extends ConnectionInvoker } } - listener.exception(this, e); + for (ConnectionListener listener: listeners) + { + listener.exception(this, e); + } + } public void exception(Throwable t) @@ -460,7 +478,10 @@ public class Connection extends ConnectionInvoker setState(CLOSED); } - listener.closed(this); + for (ConnectionListener listener: listeners) + { + listener.closed(this); + } } public void close() @@ -560,5 +581,10 @@ public class Connection extends ConnectionInvoker { return String.format("conn:%x", System.identityHashCode(this)); } + + public ConnectionSettings getConnectionSettings() + { + return conSettings; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java new file mode 100644 index 0000000000..b25c2d7fd0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public class ConnectionSettings +{ + String protocol = "tcp"; + String host = "localhost"; + String vhost; + String username = "guest"; + String password = "guest"; + String saslMechs = "PLAIN"; + String saslProtocol = "AMQP"; + String saslServerName = "localhost"; + int port = 5672; + int maxChannelCount = 32767; + int maxFrameSize = 65535; + int heartbeatInterval; + boolean useSSL; + boolean useSASLEncryption; + boolean tcpNodelay; + + public boolean isTcpNodelay() + { + return tcpNodelay; + } + + public void setTcpNodelay(boolean tcpNodelay) + { + this.tcpNodelay = tcpNodelay; + } + + public int getHeartbeatInterval() + { + return heartbeatInterval; + } + + public void setHeartbeatInterval(int heartbeatInterval) + { + this.heartbeatInterval = heartbeatInterval; + } + + public String getProtocol() + { + return protocol; + } + + public void setProtocol(String protocol) + { + this.protocol = protocol; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public void setPort(int port) + { + this.port = port; + } + + public String getVhost() + { + return vhost; + } + + public void setVhost(String vhost) + { + this.vhost = vhost; + } + + public String getUsername() + { + return username; + } + + public void setUsername(String username) + { + this.username = username; + } + + public String getPassword() + { + return password; + } + + public void setPassword(String password) + { + this.password = password; + } + + public boolean isUseSSL() + { + return useSSL; + } + + public void setUseSSL(boolean useSSL) + { + this.useSSL = useSSL; + } + + public boolean isUseSASLEncryption() + { + return useSASLEncryption; + } + + public void setUseSASLEncryption(boolean useSASLEncryption) + { + this.useSASLEncryption = useSASLEncryption; + } + + public String getSaslMechs() + { + return saslMechs; + } + + public void setSaslMechs(String saslMechs) + { + this.saslMechs = saslMechs; + } + + public String getSaslProtocol() + { + return saslProtocol; + } + + public void setSaslProtocol(String saslProtocol) + { + this.saslProtocol = saslProtocol; + } + + public String getSaslServerName() + { + return saslServerName; + } + + public void setSaslServerName(String saslServerName) + { + this.saslServerName = saslServerName; + } + + public int getMaxChannelCount() + { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) + { + this.maxChannelCount = maxChannelCount; + } + + public int getMaxFrameSize() + { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) + { + this.maxFrameSize = maxFrameSize; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 30d7e52d33..383fd6131a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -297,7 +297,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { try { - socket.setSoTimeout(i*2); + socket.setSoTimeout(i); } catch (Exception e) { diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 8aa8c5f647..6554b135f5 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -160,7 +160,7 @@ public class ConnectionTest extends TestCase implements SessionListener private Connection connect(final Condition closed) { Connection conn = new Connection(); - conn.setConnectionListener(new ConnectionListener() + conn.addConnectionListener(new ConnectionListener() { public void opened(Connection conn) {} public void exception(Connection conn, ConnectionException exc) @@ -311,7 +311,7 @@ public class ConnectionTest extends TestCase implements SessionListener startServer(); Connection conn = new Connection(); - conn.setConnectionListener(new FailoverConnectionListener()); + conn.addConnectionListener(new FailoverConnectionListener()); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); ssn.setSessionListener(new TestSessionListener()); @@ -366,7 +366,7 @@ public class ConnectionTest extends TestCase implements SessionListener startServer(); Connection conn = new Connection(); - conn.setConnectionListener(new FailoverConnectionListener()); + conn.addConnectionListener(new FailoverConnectionListener()); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); ssn.setSessionListener(new TestSessionListener()); |
