diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
| commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
| tree | ce493e10baa95f44be8beb5778ce51783463196d /java/common/src | |
| parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
| download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
38 files changed, 458 insertions, 161 deletions
diff --git a/java/common/src/main/java/common.bnd b/java/common/src/main/java/common.bnd index 9149986aa3..b34f8deacc 100755 --- a/java/common/src/main/java/common.bnd +++ b/java/common/src/main/java/common.bnd @@ -17,7 +17,7 @@ # under the License.
#
-ver: 0.17.0
+ver: 0.19.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 86d439d269..2d54e35191 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -40,6 +40,8 @@ public class AMQException extends Exception /** Holds the AMQ error code constant associated with this exception. */ private AMQConstant _errorCode; + private boolean _isHardError; + /** * Creates an exception with an optional error code, optional message and optional underlying cause. * @@ -49,8 +51,24 @@ public class AMQException extends Exception */ public AMQException(AMQConstant errorCode, String msg, Throwable cause) { + // isHardError is defaulted to true to avoid unnessacery modification to + // existing code. + this(errorCode,true,msg,cause); + } + + /** + * Creates an exception with an optional error code, optional message and optional underlying cause. + * + * @param errorCode The error code. May be null if not to be set. + * @param isHardError Denotes if the underlying error is considered a hard error. + * @param msg The exception message. May be null if not to be set. + * @param cause The underlying cause of the exception. May be null if not to be set. + */ + public AMQException(AMQConstant errorCode, boolean isHardError, String msg, Throwable cause) + { super(((msg == null) ? "" : msg), cause); _errorCode = errorCode; + _isHardError = isHardError; } /* @@ -92,7 +110,7 @@ public class AMQException extends Exception public boolean isHardError() { - return true; + return _isHardError; } /** diff --git a/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java b/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java index bbc569839a..7744b128ce 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQProtocolException.java @@ -1,8 +1,5 @@ -package org.apache.qpid; - -import org.apache.qpid.protocol.AMQConstant; - -/* Licensed to the Apache Software Foundation (ASF) under one +/* +* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -19,6 +16,10 @@ import org.apache.qpid.protocol.AMQConstant; * specific language governing permissions and limitations * under the License. */ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + public class AMQProtocolException extends AMQException { diff --git a/java/common/src/main/java/org/apache/qpid/api/Message.java b/java/common/src/main/java/org/apache/qpid/api/Message.java index 49c7be162c..c0427c2f37 100644 --- a/java/common/src/main/java/org/apache/qpid/api/Message.java +++ b/java/common/src/main/java/org/apache/qpid/api/Message.java @@ -1,12 +1,3 @@ -package org.apache.qpid.api; - -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageProperties; - -import java.io.IOException; -import java.nio.ByteBuffer; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -25,6 +16,14 @@ import java.nio.ByteBuffer; * specific language governing permissions and limitations * under the License. */ +package org.apache.qpid.api; + +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageProperties; + +import java.io.IOException; +import java.nio.ByteBuffer; public interface Message { diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 3227bb6fc2..97fbd43ea0 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -168,4 +168,28 @@ public class ClientProperties public static final String SEND_BUFFER_SIZE_PROP_NAME = "qpid.send_buffer_size"; @Deprecated public static final String LEGACY_SEND_BUFFER_SIZE_PROP_NAME = "amqj.sendBufferSize"; + + /** + * System property to set the time (in millis) to wait before failing when sending and + * the client has been flow controlled by the broker. + */ + public static final String QPID_FLOW_CONTROL_WAIT_FAILURE = "qpid.flow_control_wait_failure"; + + /** + * Default time (in millis) to wait before failing when sending and the client has been + * flow controlled by the broker. + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 60000L; + + /** + * System property to set the time (in millis) between log notifications that a + * send is waiting because the client was flow controlled by the broker. + */ + public static final String QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = "qpid.flow_control_wait_notify_period"; + + /** + * Default time (in millis) between log notifications that a send is + * waiting because the client was flow controlled by the broker. + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = 5000L; } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Validator.java b/java/common/src/main/java/org/apache/qpid/configuration/Validator.java index 13f7954bbc..e65bddf10b 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/Validator.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/Validator.java @@ -1,4 +1,3 @@ -package org.apache.qpid.configuration; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.configuration; * under the License. * */ +package org.apache.qpid.configuration; public interface Validator diff --git a/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java b/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java index b5b00ae70f..98c34bafd2 100644 --- a/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java +++ b/java/common/src/main/java/org/apache/qpid/filter/FilterableMessage.java @@ -1,21 +1,21 @@ -package org.apache.qpid.filter; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.qpid.filter; + public interface FilterableMessage { diff --git a/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java b/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java index f08b3df155..53909a0009 100644 --- a/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java +++ b/java/common/src/main/java/org/apache/qpid/filter/SelectorParsingException.java @@ -1,5 +1,3 @@ -package org.apache.qpid.filter; - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -7,15 +5,17 @@ package org.apache.qpid.filter; * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> + * * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.qpid.filter; + public class SelectorParsingException extends RuntimeException { public SelectorParsingException(String s) diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index d6f518b123..571570d7b4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -1,5 +1,3 @@ -package org.apache.qpid.framing; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +18,8 @@ package org.apache.qpid.framing; * under the License. * */ +package org.apache.qpid.framing; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java index e2db8906a1..25ab60327a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java @@ -1,5 +1,3 @@ -package org.apache.qpid.framing; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +18,9 @@ package org.apache.qpid.framing; * under the License. * */ +package org.apache.qpid.framing; + + public interface AMQShortStringTokenizer { diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java index c9ff180c54..b2967bb0bb 100644 --- a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java +++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java @@ -39,7 +39,6 @@ import java.security.KeyStore; */ public class SSLContextFactory { - public static final String JAVA_KEY_STORE_CODE = "JKS"; public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS"; private SSLContextFactory() @@ -48,28 +47,32 @@ public class SSLContextFactory } public static SSLContext buildServerContext(final String keyStorePath, - final String keyStorePassword, final String keyManagerFactoryAlgorithm) + final String keyStorePassword, final String keyStoreType, + final String keyManagerFactoryAlgorithm) throws GeneralSecurityException, IOException { - return buildContext(null, null, null, keyStorePath, keyStorePassword, + return buildContext(null, null, null, null, keyStorePath, keyStorePassword, keyStoreType, keyManagerFactoryAlgorithm, null); } public static SSLContext buildClientContext(final String trustStorePath, - final String trustStorePassword, final String trustManagerFactoryAlgorithm, - final String keyStorePath, final String keyStorePassword, + final String trustStorePassword, final String trustStoreType, + final String trustManagerFactoryAlgorithm, final String keyStorePath, + final String keyStorePassword, final String keyStoreType, final String keyManagerFactoryAlgorithm, final String certAlias) throws GeneralSecurityException, IOException { - return buildContext(trustStorePath, trustStorePassword, - trustManagerFactoryAlgorithm, keyStorePath, keyStorePassword, + return buildContext(trustStorePath, trustStorePassword, trustStoreType, + trustManagerFactoryAlgorithm, keyStorePath, keyStorePassword, keyStoreType, keyManagerFactoryAlgorithm, certAlias); } private static SSLContext buildContext(final String trustStorePath, - final String trustStorePassword, final String trustManagerFactoryAlgorithm, - final String keyStorePath, final String keyStorePassword, - final String keyManagerFactoryAlgorithm, final String certAlias) + final String trustStorePassword, final String trustStoreType, + final String trustManagerFactoryAlgorithm, + final String keyStorePath, final String keyStorePassword, + final String keyStoreType, final String keyManagerFactoryAlgorithm, + final String certAlias) throws GeneralSecurityException, IOException { // Initialize the SSLContext to work with our key managers. @@ -82,7 +85,7 @@ public class SSLContextFactory if (trustStorePath != null) { final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath, - trustStorePassword); + trustStorePassword, trustStoreType); final TrustManagerFactory tmf = TrustManagerFactory .getInstance(trustManagerFactoryAlgorithm); tmf.init(ts); @@ -99,13 +102,13 @@ public class SSLContextFactory if (certAlias != null) { keyManagers = new KeyManager[] { new QpidClientX509KeyManager( - certAlias, keyStorePath, keyStorePassword, + certAlias, keyStorePath, keyStoreType, keyStorePassword, keyManagerFactoryAlgorithm) }; } else { final KeyStore ks = SSLUtil.getInitializedKeyStore( - keyStorePath, keyStorePassword); + keyStorePath, keyStorePassword, keyStoreType); char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray(); // Set up key manager factory to use our key store diff --git a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java index 95a8d192c5..631ee98b94 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.thread; * under the License. * */ +package org.apache.qpid.thread; import java.lang.reflect.Constructor; diff --git a/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java b/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java index 4b8937acbd..c8f1cbfcba 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java +++ b/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.thread; * under the License. * */ +package org.apache.qpid.thread; public interface ThreadFactory diff --git a/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/java/common/src/main/java/org/apache/qpid/thread/Threading.java index 265b336157..d3ccc138dd 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/Threading.java +++ b/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -1,4 +1,3 @@ -package org.apache.qpid.thread; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.thread; * under the License. * */ +package org.apache.qpid.thread; public final class Threading diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 2cd6a6e465..388e3442bf 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -41,6 +41,8 @@ import static org.apache.qpid.transport.Connection.State.OPENING; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; + +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -127,6 +129,9 @@ public class Connection extends ConnectionInvoker private final AtomicBoolean connectionLost = new AtomicBoolean(false); + private SocketAddress _remoteAddress; + private SocketAddress _localAddress; + public Connection() {} public void setConnectionDelegate(ConnectionDelegate delegate) @@ -228,6 +233,9 @@ public class Connection extends ConnectionInvoker } NetworkConnection network = transport.connect(settings, secureReceiver, null); + _remoteAddress = network.getRemoteAddress(); + _localAddress = network.getLocalAddress(); + final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender()); if(secureSender instanceof ConnectionListener) { @@ -701,4 +709,14 @@ public class Connection extends ConnectionInvoker ssn.notifyFailoverRequired(); } } + + public SocketAddress getRemoteAddress() + { + return _remoteAddress; + } + + public SocketAddress getLocalAddress() + { + return _localAddress; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 084428d182..c90a11594c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -31,6 +31,7 @@ import static org.apache.qpid.configuration.ClientProperties.SEND_BUFFER_SIZE_PR import static org.apache.qpid.configuration.ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME; import static org.apache.qpid.configuration.ClientProperties.LEGACY_SEND_BUFFER_SIZE_PROP_NAME; +import java.security.KeyStore; import java.util.Map; import javax.net.ssl.KeyManagerFactory; @@ -67,10 +68,12 @@ public class ConnectionSettings private boolean useSSL; private String keyStorePath = System.getProperty("javax.net.ssl.keyStore"); private String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword"); + private String keyStoreType = System.getProperty("javax.net.ssl.keyStoreType",KeyStore.getDefaultType()); private String keyManagerFactoryAlgorithm = QpidProperty.stringProperty(KeyManagerFactory.getDefaultAlgorithm(), QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME, QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME).get(); private String trustManagerFactoryAlgorithm = QpidProperty.stringProperty(TrustManagerFactory.getDefaultAlgorithm(), QPID_SSL_TRUST_MANAGER_FACTORY_ALGORITHM_PROP_NAME, QPID_SSL_TRUST_STORE_CERT_TYPE_PROP_NAME).get(); - private String trustStorePath = System.getProperty("javax.net.ssl.trustStore");; - private String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");; + private String trustStorePath = System.getProperty("javax.net.ssl.trustStore"); + private String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); + private String trustStoreType = System.getProperty("javax.net.ssl.trustStoreType",KeyStore.getDefaultType()); private String certAlias; private boolean verifyHostname; @@ -262,6 +265,16 @@ public class ConnectionSettings this.keyStorePassword = keyStorePassword; } + public void setKeyStoreType(String keyStoreType) + { + this.keyStoreType = keyStoreType; + } + + public String getKeyStoreType() + { + return keyStoreType; + } + public String getTrustStorePath() { return trustStorePath; @@ -322,6 +335,16 @@ public class ConnectionSettings this.trustManagerFactoryAlgorithm = trustManagerFactoryAlgorithm; } + public String getTrustStoreType() + { + return trustStoreType; + } + + public void setTrustStoreType(String trustStoreType) + { + this.trustStoreType = trustStoreType; + } + public int getReadBufferSize() { return readBufferSize; diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java index 472beb6bb1..20d6f98fa6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java +++ b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java @@ -25,17 +25,17 @@ import java.net.InetSocketAddress; /** * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned - * from here if the underlying implementation supports them. - */ -public interface NetworkTransportConfiguration -{ - // Taken from Socket - Boolean getTcpNoDelay(); + * from here if the underlying implementation supports them. + */ +public interface NetworkTransportConfiguration +{ + // Taken from Socket + Boolean getTcpNoDelay(); - // The amount of memory in bytes to allocate to the incoming buffer - Integer getReceiveBufferSize(); + // The amount of memory in bytes to allocate to the incoming buffer + Integer getReceiveBufferSize(); - // The amount of memory in bytes to allocate to the outgoing buffer + // The amount of memory in bytes to allocate to the outgoing buffer Integer getSendBufferSize(); Integer getPort(); @@ -47,4 +47,8 @@ public interface NetworkTransportConfiguration Integer getConnectorProcessors(); InetSocketAddress getAddress(); + + boolean needClientAuth(); + + boolean wantClientAuth(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index ec409d1c72..e9a7d51456 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -78,7 +78,7 @@ public class ServerDelegate extends ConnectionDelegate try { - SaslServer ss = createSaslServer(mechanism); + SaslServer ss = createSaslServer(conn, mechanism); if (ss == null) { conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, @@ -94,7 +94,7 @@ public class ServerDelegate extends ConnectionDelegate } } - protected SaslServer createSaslServer(String mechanism) + protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException { SaslServer ss = Sasl.createSaslServer(mechanism, "AMQP", "localhost", null, null); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 110c73f718..95c3e4669f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -94,8 +94,10 @@ public class Session extends SessionInvoker private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); - private final long blockedSendTimeout = Long.getLong("qpid.flow_control_wait_failure", timeout); - private long blockedSendReportingPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long blockedSendTimeout = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE, + ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + private long blockedSendReportingPeriod = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); private boolean autoSync = false; @@ -210,6 +212,11 @@ public class Session extends SessionInvoker } } + protected State getState() + { + return this.state; + } + void setFlowControl(boolean value) { flowControl = value; @@ -307,7 +314,7 @@ public class Session extends SessionInvoker xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(), header.getNonStandardProperties())); } - + } else { @@ -616,7 +623,7 @@ public class Session extends SessionInvoker { acquireCredit(); } - + synchronized (commandsLock) { if (state == DETACHED && m.isUnreliable()) @@ -732,11 +739,11 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - + boolean replayTransfer = !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); - + if ((replayTransfer) || m.hasCompletionListener()) { setCommand(next, m); @@ -833,7 +840,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - checkFailoverRequired("Session sync was interrupted by failover."); + checkFailoverRequired("Session sync was interrupted by failover."); if(log.isDebugEnabled()) { log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); @@ -871,7 +878,7 @@ public class Session extends SessionInvoker { future = results.remove(command); } - + if (future != null) { future.set(result); @@ -1039,7 +1046,7 @@ public class Session extends SessionInvoker } } - protected void awaitClose() + protected void awaitClose() { Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED) @@ -1096,7 +1103,7 @@ public class Session extends SessionInvoker if(state == CLOSED) { - connection.removeSession(this); + connection.removeSession(this); listener.closed(this); } } @@ -1184,4 +1191,12 @@ public class Session extends SessionInvoker } } } + + /** + * An auxiliary method for test purposes only + */ + public boolean isFlowBlocked() + { + return flowControl && credit.availablePermits() == 0; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 2cc7c14f00..12c42d6643 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.Sender; - import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.Principal; +import org.apache.qpid.transport.Sender; public interface NetworkConnection { @@ -46,4 +46,8 @@ public interface NetworkConnection void setMaxWriteIdle(int sec); void setMaxReadIdle(int sec); -}
\ No newline at end of file + + void setPeerPrincipal(Principal principal); + + Principal getPeerPrincipal(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 4046691779..2658296c5f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -20,16 +20,15 @@ */ package org.apache.qpid.transport.network.io; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; - import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.security.Principal; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IoNetworkConnection implements NetworkConnection { @@ -38,6 +37,7 @@ public class IoNetworkConnection implements NetworkConnection private final long _timeout; private final IoSender _ioSender; private final IoReceiver _ioReceiver; + private Principal _principal; public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, int sendBufferSize, int receiveBufferSize, long timeout) @@ -97,4 +97,16 @@ public class IoNetworkConnection implements NetworkConnection // TODO implement support for setting heartbeating config in this way // Currently a socket timeout is used in IoSender } + + @Override + public void setPeerPrincipal(Principal principal) + { + _principal = principal; + } + + @Override + public Principal getPeerPrincipal() + { + return _principal; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 42c8334a5d..dfb318b80c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -27,10 +27,12 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; - +import java.security.Principal; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLServerSocket; import javax.net.ssl.SSLServerSocketFactory; - +import javax.net.ssl.SSLSocket; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.transport.ConnectionSettings; @@ -45,10 +47,10 @@ import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); + private static final int TIMEOUT = 60000; private Socket _socket; private IoNetworkConnection _connection; - private long _timeout = 60000; private AcceptingThread _acceptor; public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext) @@ -73,7 +75,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet InetAddress address = InetAddress.getByName(settings.getHost()); - _socket.connect(new InetSocketAddress(address, settings.getPort())); + _socket.connect(new InetSocketAddress(address, settings.getPort()), TIMEOUT); } catch (SocketException e) { @@ -86,7 +88,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT); _connection.start(); } catch(Exception e) @@ -167,6 +169,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); _serverSocket = socketFactory.createServerSocket(); + ((SSLServerSocket)_serverSocket).setNeedClientAuth(config.needClientAuth()); + ((SSLServerSocket)_serverSocket).setWantClientAuth(config.wantClientAuth()); + } _serverSocket.setReuseAddress(true); @@ -216,9 +221,23 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet socket.setSendBufferSize(sendBufferSize); socket.setReceiveBufferSize(receiveBufferSize); + ProtocolEngine engine = _factory.newProtocolEngine(); - NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout); + NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT); + + if(_sslContext != null) + { + try + { + Principal peerPrincipal = ((SSLSocket) socket).getSession().getPeerPrincipal(); + connection.setPeerPrincipal(peerPrincipal); + } + catch(SSLPeerUnverifiedException e) + { + // ignore + } + } engine.setNetworkConnection(connection, connection.getSender()); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java index 442800c529..478355edc1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -78,9 +78,11 @@ public class SecurityLayerFactory sslCtx = SSLContextFactory .buildClientContext(settings.getTrustStorePath(), settings.getTrustStorePassword(), + settings.getTrustStoreType(), settings.getTrustManagerFactoryAlgorithm(), settings.getKeyStorePath(), settings.getKeyStorePassword(), + settings.getKeyStoreType(), settings.getKeyManagerFactoryAlgorithm(), settings.getCertAlias()); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java index 625e1a77c2..a90ea52202 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java @@ -1,4 +1,3 @@ -package org.apache.qpid.transport.network.security.sasl; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.transport.network.security.sasl; * under the License. * */ +package org.apache.qpid.transport.network.security.sasl; import org.apache.qpid.transport.Connection; diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java index a100b96412..59e9453454 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java @@ -1,4 +1,3 @@ -package org.apache.qpid.transport.network.security.sasl; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.transport.network.security.sasl; * under the License. * */ +package org.apache.qpid.transport.network.security.sasl; import org.apache.qpid.transport.Receiver; diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 61d54a8386..098f2fb20c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -1,4 +1,3 @@ -package org.apache.qpid.transport.network.security.sasl; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.transport.network.security.sasl; * under the License. * */ +package org.apache.qpid.transport.network.security.sasl; import org.apache.qpid.transport.Sender; diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java index 3ab028c8a8..0dccf37979 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java @@ -40,11 +40,11 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager private X509ExtendedKeyManager delegate; private String alias; - public QpidClientX509KeyManager(String alias, String keyStorePath, + public QpidClientX509KeyManager(String alias, String keyStorePath, String keyStoreType, String keyStorePassword, String keyManagerFactoryAlgorithmName) throws GeneralSecurityException, IOException { this.alias = alias; - KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword); + KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword,keyStoreType); KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName); kmf.init(ks, keyStorePassword.toCharArray()); this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java index 71a73db71f..ce7cc105a1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java @@ -125,9 +125,9 @@ public class SSLUtil return id.toString(); } - public static KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException + public static KeyStore getInitializedKeyStore(String storePath, String storePassword, String keyStoreType) throws GeneralSecurityException, IOException { - KeyStore ks = KeyStore.getInstance("JKS"); + KeyStore ks = KeyStore.getInstance(keyStoreType); InputStream in = null; try { @@ -140,7 +140,7 @@ public class SSLUtil { in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath); } - if (in == null) + if (in == null && !"PKCS11".equalsIgnoreCase(keyStoreType)) // PKCS11 will not require an explicit path { throw new IOException("Unable to load keystore resource: " + storePath); } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java index fe7b01761b..939080e252 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java @@ -1,4 +1,3 @@ -package org.apache.qpid.url; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.url; * under the License. * */ +package org.apache.qpid.url; import org.slf4j.Logger; diff --git a/java/common/src/main/java/org/apache/qpid/util/Serial.java b/java/common/src/main/java/org/apache/qpid/util/Serial.java index 451d5d60eb..287c094b88 100644 --- a/java/common/src/main/java/org/apache/qpid/util/Serial.java +++ b/java/common/src/main/java/org/apache/qpid/util/Serial.java @@ -1,4 +1,3 @@ -package org.apache.qpid.util; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.util; * under the License. * */ +package org.apache.qpid.util; import java.util.Comparator; diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index 6540e053b1..cb820b333b 100644 --- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.codec; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.codec; * under the License. * */ +package org.apache.qpid.codec; import junit.framework.TestCase; diff --git a/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java b/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java index 3a95ca330f..e0244e0581 100644 --- a/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java +++ b/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java @@ -1,4 +1,3 @@ -package org.apache.qpid.codec; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.codec; * under the License. * */ +package org.apache.qpid.codec; import org.apache.qpid.AMQException; diff --git a/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java b/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java index 69b04c9979..21b8871d9a 100644 --- a/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java +++ b/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java @@ -31,13 +31,14 @@ public class SSLContextFactoryTest extends QpidTestCase private static final String CLIENT_KEYSTORE_PATH = TEST_RESOURCES_DIR + "/ssl/java_client_keystore.jks"; private static final String CLIENT_TRUSTSTORE_PATH = TEST_RESOURCES_DIR + "/ssl/java_client_truststore.jks"; private static final String STORE_PASSWORD = "password"; + private static final String STORE_TYPE = "JKS"; private static final String DEFAULT_KEY_MANAGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm(); private static final String DEFAULT_TRUST_MANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm(); private static final String CERT_ALIAS_APP1 = "app1"; public void testBuildServerContext() throws Exception { - SSLContext context = SSLContextFactory.buildServerContext(BROKER_KEYSTORE_PATH, STORE_PASSWORD, DEFAULT_KEY_MANAGER_ALGORITHM); + SSLContext context = SSLContextFactory.buildServerContext(BROKER_KEYSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_KEY_MANAGER_ALGORITHM); assertNotNull("SSLContext should not be null", context); } @@ -45,7 +46,7 @@ public class SSLContextFactoryTest extends QpidTestCase { try { - SSLContextFactory.buildServerContext(BROKER_KEYSTORE_PATH, "sajdklsad", DEFAULT_KEY_MANAGER_ALGORITHM); + SSLContextFactory.buildServerContext(BROKER_KEYSTORE_PATH, "sajdklsad", STORE_TYPE, DEFAULT_KEY_MANAGER_ALGORITHM); fail("Exception was not thrown due to incorrect password"); } catch (IOException e) @@ -58,7 +59,7 @@ public class SSLContextFactoryTest extends QpidTestCase { try { - SSLContextFactory.buildClientContext("/path/to/nothing", STORE_PASSWORD, DEFAULT_TRUST_MANAGER_ALGORITHM, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, DEFAULT_KEY_MANAGER_ALGORITHM, null); + SSLContextFactory.buildClientContext("/path/to/nothing", STORE_PASSWORD, STORE_TYPE, DEFAULT_TRUST_MANAGER_ALGORITHM, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_KEY_MANAGER_ALGORITHM, null); fail("Exception was not thrown due to incorrect path"); } catch (IOException e) @@ -69,19 +70,19 @@ public class SSLContextFactoryTest extends QpidTestCase public void testBuildClientContextForSSLEncryptionOnly() throws Exception { - SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, DEFAULT_TRUST_MANAGER_ALGORITHM, null, null, null, null); + SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_TRUST_MANAGER_ALGORITHM, null, null, null, null, null); assertNotNull("SSLContext should not be null", context); } public void testBuildClientContextWithForClientAuth() throws Exception { - SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, DEFAULT_TRUST_MANAGER_ALGORITHM, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, DEFAULT_KEY_MANAGER_ALGORITHM, null); + SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_TRUST_MANAGER_ALGORITHM, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_KEY_MANAGER_ALGORITHM, null); assertNotNull("SSLContext should not be null", context); } public void testBuildClientContextWithForClientAuthWithCertAlias() throws Exception { - SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, DEFAULT_TRUST_MANAGER_ALGORITHM, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, DEFAULT_KEY_MANAGER_ALGORITHM, CERT_ALIAS_APP1); + SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_TRUST_MANAGER_ALGORITHM, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, STORE_TYPE, DEFAULT_KEY_MANAGER_ALGORITHM, CERT_ALIAS_APP1); assertNotNull("SSLContext should not be null", context); } } diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/PortHelper.java b/java/common/src/test/java/org/apache/qpid/test/utils/PortHelper.java new file mode 100644 index 0000000000..d3586c364f --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/test/utils/PortHelper.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.test.utils; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.util.Set; + +import org.apache.log4j.Logger; + +public class PortHelper +{ + private static final Logger _logger = Logger.getLogger(PortHelper.class); + + private static final int DEFAULT_TIMEOUT_MILLIS = 5000; + + private int timeout = DEFAULT_TIMEOUT_MILLIS; + + public void waitUntilPortsAreFree(Set<Integer> ports) + { + _logger.debug("Checking if ports " + ports + " are free..."); + + for (Integer port : ports) + { + waitUntilPortIsFree(port); + } + + _logger.debug("ports " + ports + " are free"); + } + + private void waitUntilPortIsFree(int port) + { + long startTime = System.currentTimeMillis(); + long deadline = startTime + timeout; + boolean alreadyFailed = false; + + while (true) + { + if (System.currentTimeMillis() > deadline) + { + throw new RuntimeException("Timed out after " + timeout + " ms waiting for port " + port + " to become available"); + } + + if (isPortAvailable(port)) + { + if(alreadyFailed) + { + _logger.debug("port " + port + " is now available"); + } + return; + } + else + { + alreadyFailed = true; + } + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + } + + public boolean isPortAvailable(int port) + { + ServerSocket serverSocket = null; + DatagramSocket datagramSocket = null; + + try + { + serverSocket = new ServerSocket(port); + serverSocket.setReuseAddress(true); // ensures that the port is subsequently usable + datagramSocket = new DatagramSocket(port); + datagramSocket.setReuseAddress(true); + return true; + } + catch (IOException e) + { + _logger.debug("port " + port + " is not free"); + return false; + } + finally + { + if (serverSocket != null) + { + try + { + serverSocket.close(); + } + catch (IOException e) + { + throw new RuntimeException("Couldn't close port " + port + " that we created to check its availability", e); + } + } + if(datagramSocket != null) + { + datagramSocket.close(); + } + } + } + + public void setTimeout(int timeout) + { + this.timeout = timeout; + } +} diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index cbf6caf141..ec06400b7d 100644 --- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -29,8 +29,6 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.net.DatagramSocket; -import java.net.ServerSocket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -110,8 +108,8 @@ public class QpidTestCase extends TestCase } } - protected static final String MS_FACTORY_CLASS_NAME_KEY = "messagestorefactory.class.name"; - protected static final String MEMORY_STORE_FACTORY_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStoreFactory"; + protected static final String MESSAGE_STORE_CLASS_NAME_KEY = "messagestore.class.name"; + protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore"; private static List<String> _exclusionList; @@ -140,12 +138,12 @@ public class QpidTestCase extends TestCase } } - public String getTestProfileMessageStoreFactoryClassName() + public String getTestProfileMessageStoreClassName() { - final String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY); - _logger.debug("MS_FACTORY_CLASS_NAME_KEY " + storeFactoryClass); + final String storeClass = System.getProperty(MESSAGE_STORE_CLASS_NAME_KEY); + _logger.debug("MESSAGE_STORE_CLASS_NAME_KEY " + storeClass); - return storeFactoryClass != null ? storeFactoryClass : MEMORY_STORE_FACTORY_CLASS_NAME ; + return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } @@ -166,9 +164,10 @@ public class QpidTestCase extends TestCase throw new IllegalArgumentException("Invalid start port: " + fromPort); } + PortHelper portHelper = new PortHelper(); for (int i = fromPort; i <= MAX_PORT_NUMBER; i++) { - if (available(i)) { + if (portHelper.isPortAvailable(i)) { return i; } } @@ -176,54 +175,6 @@ public class QpidTestCase extends TestCase throw new NoSuchElementException("Could not find an available port above " + fromPort); } - /** - * Checks to see if a specific port is available. - * - * @param port the port to check for availability - */ - private boolean available(int port) - { - if ((port < MIN_PORT_NUMBER) || (port > MAX_PORT_NUMBER)) - { - throw new IllegalArgumentException("Invalid start port: " + port); - } - - ServerSocket ss = null; - DatagramSocket ds = null; - try - { - ss = new ServerSocket(port); - ss.setReuseAddress(true); - ds = new DatagramSocket(port); - ds.setReuseAddress(true); - return true; - } - catch (IOException e) - { - } - finally - { - if (ds != null) - { - ds.close(); - } - - if (ss != null) - { - try - { - ss.close(); - } - catch (IOException e) - { - /* should not be thrown */ - } - } - } - - return false; - } - public int findFreePort() { return getNextAvailable(10000); diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/TestFileUtils.java b/java/common/src/test/java/org/apache/qpid/test/utils/TestFileUtils.java new file mode 100644 index 0000000000..056d11faaa --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/test/utils/TestFileUtils.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.utils; + +import java.io.File; + +import org.apache.qpid.util.FileUtils; + +/** + * Utility methods intended to be used in unit tests that manipulate files + */ +public class TestFileUtils +{ + private static final String SYSTEM_TMP_DIR = System.getProperty("java.io.tmpdir"); + + /** + * Create and return a temporary directory that will be deleted on exit. + */ + public static File createTestDirectory() + { + String dirNameStem = TestFileUtils.class.getSimpleName() + "-testDir"; + return createTestDirectory(dirNameStem, true); + } + + /** + * Creates an empty directory with a name like /tmp/dirNameStem-12345678 + */ + public static File createTestDirectory(String dirNameStem, boolean deleteOnExit) + { + File testDir = new File(SYSTEM_TMP_DIR, dirNameStem + "-" + System.currentTimeMillis()); + if (testDir.exists()) + { + FileUtils.delete(testDir, true); + } + + testDir.mkdirs(); + + if (deleteOnExit) + { + testDir.deleteOnExit(); + } + + return testDir; + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java index 548e8dab12..893f66c5ff 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java +++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.transport; +import java.security.Principal; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.network.NetworkConnection; @@ -71,6 +72,17 @@ public class TestNetworkConnection implements NetworkConnection } + @Override + public void setPeerPrincipal(Principal principal) + { + } + + @Override + public Principal getPeerPrincipal() + { + return null; + } + public void setMaxWriteIdle(int idleTime) { diff --git a/java/common/src/test/java/org/apache/qpid/util/SerialTest.java b/java/common/src/test/java/org/apache/qpid/util/SerialTest.java index ef8e7de61c..c3fcf73b38 100644 --- a/java/common/src/test/java/org/apache/qpid/util/SerialTest.java +++ b/java/common/src/test/java/org/apache/qpid/util/SerialTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.util; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.util; * under the License. * */ +package org.apache.qpid.util; import junit.framework.TestCase; |
