From bc9a4cde3a75ce8693649873be9534073b4aeb58 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 21 Jan 2010 02:45:35 +0000 Subject: The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352 - Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352. - Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352. - Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged. - Modified the ClientDelegate to handle heartbeat and idelTimeout value properly. - Added support to specify config options via the connection URL - QPID-2351 - Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901506 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/transport/ClientDelegate.java | 41 ++--- .../java/org/apache/qpid/transport/Connection.java | 62 +++++-- .../apache/qpid/transport/ConnectionSettings.java | 190 +++++++++++++++++++++ .../apache/qpid/transport/network/io/IoSender.java | 2 +- .../org/apache/qpid/transport/ConnectionTest.java | 6 +- 5 files changed, 256 insertions(+), 45 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 4200a8352c..b12fbb75e6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -61,24 +61,13 @@ public class ClientDelegate extends ConnectionDelegate } catch (GSSException ignore) {} } - private String vhost; - private String username; - private String password; private List clientMechs; - private String protocol; - private String serverName; + private ConnectionSettings conSettings; - public ClientDelegate(String vhost, String username, String password,String saslMechs) + public ClientDelegate(ConnectionSettings settings) { - this.vhost = vhost; - this.username = username; - this.password = password; - this.clientMechs = Arrays.asList(saslMechs.split(" ")); - - // Looks kinda of silly but the Sun SASL Kerberos client uses the - // protocol + servername as the service key. - this.protocol = System.getProperty("qpid.sasl_protocol","AMQP"); - this.serverName = System.getProperty("qpid.sasl_server_name","localhost"); + this.conSettings = settings; + this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" ")); } public void init(Connection conn, ProtocolHeader hdr) @@ -128,11 +117,16 @@ public class ClientDelegate extends ConnectionDelegate try { + Map saslProps = new HashMap(); + if (conSettings.isUseSASLEncryption()) + { + saslProps.put(Sasl.QOP, "auth-conf"); + } UsernamePasswordCallbackHandler handler = new UsernamePasswordCallbackHandler(); - handler.initialise(username, password); + handler.initialise(conSettings.getUsername(), conSettings.getPassword()); SaslClient sc = Sasl.createSaslClient - (mechs, null, protocol, serverName, null, handler); + (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler); conn.setSaslClient(sc); byte[] response = sc.hasInitialResponse() ? @@ -164,15 +158,16 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { conn.setChannelMax(tune.getChannelMax()); - int hb_interval = calculateHeartbeatInterval(conn, + int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(), tune.getHeartbeatMin(), tune.getHeartbeatMax() ); conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), hb_interval); - conn.setIdleTimeout(hb_interval*1000); - conn.connectionOpen(vhost, null, Option.INSIST); + // The idle timeout is twice the heartbeat amount (in milisecs) + conn.setIdleTimeout(hb_interval*1000*2); + conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST); } @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok) @@ -198,9 +193,9 @@ public class ClientDelegate extends ConnectionDelegate /** * Currently the spec specified the min and max for heartbeat using secs */ - private int calculateHeartbeatInterval(Connection conn,int min, int max) + private int calculateHeartbeatInterval(int heartbeat,int min, int max) { - int i = conn.getIdleTimeout()/1000; + int i = heartbeat; if (i == 0) { log.warn("Idle timeout is zero. Heartbeats are disabled"); @@ -245,7 +240,7 @@ public class ClientDelegate extends ConnectionDelegate private String getUserID() { log.debug("Obtaining userID from kerberos"); - String service = protocol + "@" + serverName; + String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName(); GSSManager manager = GSSManager.getInstance(); try diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 9f1916e1d1..17a13561c8 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -76,7 +76,7 @@ public class Connection extends ConnectionInvoker private State state = NEW; final private Object lock = new Object(); private long timeout = 60000; - private ConnectionListener listener = new DefaultConnectionListener(); + private List listeners = new ArrayList(); private ConnectionException error = null; private int channelMax = 1; @@ -86,7 +86,8 @@ public class Connection extends ConnectionInvoker private int idleTimeout = 0; private String _authorizationID; private String userID; - + private ConnectionSettings conSettings; + // want to make this final private int _connectionId; @@ -97,16 +98,9 @@ public class Connection extends ConnectionInvoker this.delegate = delegate; } - public void setConnectionListener(ConnectionListener listener) + public void addConnectionListener(ConnectionListener listener) { - if (listener == null) - { - this.listener = new DefaultConnectionListener(); - } - else - { - this.listener = listener; - } + listeners.add(listener); } public Sender getSender() @@ -154,7 +148,7 @@ public class Connection extends ConnectionInvoker this.saslClient = saslClient; } - SaslClient getSaslClient() + public SaslClient getSaslClient() { return saslClient; } @@ -170,14 +164,31 @@ public class Connection extends ConnectionInvoker } public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs) + { + ConnectionSettings settings = new ConnectionSettings(); + settings.setHost(host); + settings.setPort(port); + settings.setVhost(vhost); + settings.setUsername(username); + settings.setPassword(password); + settings.setUseSSL(ssl); + settings.setSaslMechs(saslMechs); + connect(settings); + } + + public void connect(ConnectionSettings settings) { synchronized (lock) { + conSettings = settings; state = OPENING; - userID = username; - delegate = new ClientDelegate(vhost, username, password,saslMechs); + userID = settings.getUsername(); + delegate = new ClientDelegate(settings); - IoTransport.connect(host, port, ConnectionBinding.get(this), ssl); + IoTransport.connect(settings.getHost(), + settings.getPort(), + ConnectionBinding.get(this), + settings.isUseSSL()); send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); @@ -218,7 +229,10 @@ public class Connection extends ConnectionInvoker } } - listener.opened(this); + for (ConnectionListener listener: listeners) + { + listener.opened(this); + } } public Session createSession() @@ -407,7 +421,11 @@ public class Connection extends ConnectionInvoker } } - listener.exception(this, e); + for (ConnectionListener listener: listeners) + { + listener.exception(this, e); + } + } public void exception(Throwable t) @@ -460,7 +478,10 @@ public class Connection extends ConnectionInvoker setState(CLOSED); } - listener.closed(this); + for (ConnectionListener listener: listeners) + { + listener.closed(this); + } } public void close() @@ -560,5 +581,10 @@ public class Connection extends ConnectionInvoker { return String.format("conn:%x", System.identityHashCode(this)); } + + public ConnectionSettings getConnectionSettings() + { + return conSettings; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java new file mode 100644 index 0000000000..b25c2d7fd0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public class ConnectionSettings +{ + String protocol = "tcp"; + String host = "localhost"; + String vhost; + String username = "guest"; + String password = "guest"; + String saslMechs = "PLAIN"; + String saslProtocol = "AMQP"; + String saslServerName = "localhost"; + int port = 5672; + int maxChannelCount = 32767; + int maxFrameSize = 65535; + int heartbeatInterval; + boolean useSSL; + boolean useSASLEncryption; + boolean tcpNodelay; + + public boolean isTcpNodelay() + { + return tcpNodelay; + } + + public void setTcpNodelay(boolean tcpNodelay) + { + this.tcpNodelay = tcpNodelay; + } + + public int getHeartbeatInterval() + { + return heartbeatInterval; + } + + public void setHeartbeatInterval(int heartbeatInterval) + { + this.heartbeatInterval = heartbeatInterval; + } + + public String getProtocol() + { + return protocol; + } + + public void setProtocol(String protocol) + { + this.protocol = protocol; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public void setPort(int port) + { + this.port = port; + } + + public String getVhost() + { + return vhost; + } + + public void setVhost(String vhost) + { + this.vhost = vhost; + } + + public String getUsername() + { + return username; + } + + public void setUsername(String username) + { + this.username = username; + } + + public String getPassword() + { + return password; + } + + public void setPassword(String password) + { + this.password = password; + } + + public boolean isUseSSL() + { + return useSSL; + } + + public void setUseSSL(boolean useSSL) + { + this.useSSL = useSSL; + } + + public boolean isUseSASLEncryption() + { + return useSASLEncryption; + } + + public void setUseSASLEncryption(boolean useSASLEncryption) + { + this.useSASLEncryption = useSASLEncryption; + } + + public String getSaslMechs() + { + return saslMechs; + } + + public void setSaslMechs(String saslMechs) + { + this.saslMechs = saslMechs; + } + + public String getSaslProtocol() + { + return saslProtocol; + } + + public void setSaslProtocol(String saslProtocol) + { + this.saslProtocol = saslProtocol; + } + + public String getSaslServerName() + { + return saslServerName; + } + + public void setSaslServerName(String saslServerName) + { + this.saslServerName = saslServerName; + } + + public int getMaxChannelCount() + { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) + { + this.maxChannelCount = maxChannelCount; + } + + public int getMaxFrameSize() + { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) + { + this.maxFrameSize = maxFrameSize; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 30d7e52d33..383fd6131a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -297,7 +297,7 @@ public final class IoSender implements Runnable, Sender { try { - socket.setSoTimeout(i*2); + socket.setSoTimeout(i); } catch (Exception e) { diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 8aa8c5f647..6554b135f5 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -160,7 +160,7 @@ public class ConnectionTest extends TestCase implements SessionListener private Connection connect(final Condition closed) { Connection conn = new Connection(); - conn.setConnectionListener(new ConnectionListener() + conn.addConnectionListener(new ConnectionListener() { public void opened(Connection conn) {} public void exception(Connection conn, ConnectionException exc) @@ -311,7 +311,7 @@ public class ConnectionTest extends TestCase implements SessionListener startServer(); Connection conn = new Connection(); - conn.setConnectionListener(new FailoverConnectionListener()); + conn.addConnectionListener(new FailoverConnectionListener()); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); ssn.setSessionListener(new TestSessionListener()); @@ -366,7 +366,7 @@ public class ConnectionTest extends TestCase implements SessionListener startServer(); Connection conn = new Connection(); - conn.setConnectionListener(new FailoverConnectionListener()); + conn.addConnectionListener(new FailoverConnectionListener()); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); ssn.setSessionListener(new TestSessionListener()); -- cgit v1.2.1