diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-08-15 14:46:06 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-08-15 14:46:06 +0000 |
| commit | 67663fcc5f0d01f1df1a8cc006ec86725031f10a (patch) | |
| tree | 8a18a7ec88a9b99ab6269af27465820769db26ed /java/common | |
| parent | 034f02dac2f4645094633c77805b1e538e4d0583 (diff) | |
| download | qpid-python-67663fcc5f0d01f1df1a8cc006ec86725031f10a.tar.gz | |
QPID-3342: Regression when forming SSL connections.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
5 files changed, 109 insertions, 64 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java new file mode 100644 index 0000000000..9db7dd557a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.security; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class SSLStatus +{ + private final Object _sslLock = new Object(); + private final AtomicBoolean _sslErrorFlag = new AtomicBoolean(false); + + /** + * Lock used to coordinate the SSL sender with the SSL receiver. + * + * @return lock + */ + public Object getSslLock() + { + return _sslLock; + } + + public boolean getSslErrorFlag() + { + return _sslErrorFlag.get(); + } + + public void setSslErrorFlag() + { + _sslErrorFlag.set(true); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java index 69e4b52edb..3479aaa42a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java @@ -106,8 +106,8 @@ public class SecurityLayer class SSLSecurityLayer { - SSLEngine engine; - SSLSender sender; + final SSLEngine _engine; + final SSLStatus _sslStatus = new SSLStatus(); public SSLSecurityLayer() { @@ -123,8 +123,8 @@ public class SecurityLayer try { - engine = sslCtx.createSSLEngine(); - engine.setUseClientMode(true); + _engine = sslCtx.createSSLEngine(); + _engine.setUseClientMode(true); } catch(Exception e) { @@ -134,28 +134,21 @@ public class SecurityLayer public SSLSender sender(Sender<ByteBuffer> delegate) { - sender = new SSLSender(engine,delegate); + SSLSender sender = new SSLSender(_engine, delegate, _sslStatus); sender.setConnectionSettings(settings); return sender; } public SSLReceiver receiver(Receiver<ByteBuffer> delegate) { - if (sender == null) - { - throw new - IllegalStateException("SecurityLayer.sender method should be " + - "invoked before SecurityLayer.receiver"); - } - - SSLReceiver receiver = new SSLReceiver(engine,delegate,sender); + SSLReceiver receiver = new SSLReceiver(_engine, delegate, _sslStatus); receiver.setConnectionSettings(settings); return receiver; } public String getUserID() { - return SSLUtil.retriveIdentity(engine); + return SSLUtil.retriveIdentity(_engine); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java index e227a51729..878f0b2352 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java @@ -24,38 +24,37 @@ import java.nio.ByteBuffer; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLException; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; +import javax.net.ssl.SSLException; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.security.SSLStatus; import org.apache.qpid.transport.util.Logger; public class SSLReceiver implements Receiver<ByteBuffer> { - private Receiver<ByteBuffer> delegate; - private SSLEngine engine; - private SSLSender sender; - private int sslBufSize; + private static final Logger log = Logger.get(SSLReceiver.class); + + private final Receiver<ByteBuffer> delegate; + private final SSLEngine engine; + private final int sslBufSize; + private final ByteBuffer localBuffer; + private final SSLStatus _sslStatus; + private ConnectionSettings settings; private ByteBuffer appData; - private ByteBuffer localBuffer; private boolean dataCached = false; - private final Object notificationToken; - private ConnectionSettings settings; - - private static final Logger log = Logger.get(SSLReceiver.class); - public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender) + public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; - this.sender = sender; this.sslBufSize = engine.getSession().getApplicationBufferSize(); appData = ByteBuffer.allocate(sslBufSize); localBuffer = ByteBuffer.allocate(sslBufSize); - notificationToken = sender.getNotificationToken(); + _sslStatus = sslStatus; } public void setConnectionSettings(ConnectionSettings settings) @@ -102,9 +101,9 @@ public class SSLReceiver implements Receiver<ByteBuffer> try { SSLEngineResult result = engine.unwrap(netData, appData); - synchronized (notificationToken) + synchronized (_sslStatus.getSslLock()) { - notificationToken.notifyAll(); + _sslStatus.getSslLock().notifyAll(); } int read = result.bytesProduced(); @@ -129,9 +128,9 @@ public class SSLReceiver implements Receiver<ByteBuffer> switch(status) { case CLOSED: - synchronized(notificationToken) + synchronized(_sslStatus.getSslLock()) { - notificationToken.notifyAll(); + _sslStatus.getSslLock().notifyAll(); } return; @@ -163,7 +162,7 @@ public class SSLReceiver implements Receiver<ByteBuffer> break; case NEED_TASK: - sender.doTasks(); + doTasks(); handshakeStatus = engine.getHandshakeStatus(); case FINISHED: @@ -174,9 +173,9 @@ public class SSLReceiver implements Receiver<ByteBuffer> case NEED_WRAP: case NOT_HANDSHAKING: - synchronized(notificationToken) + synchronized(_sslStatus.getSslLock()) { - notificationToken.notifyAll(); + _sslStatus.getSslLock().notifyAll(); } break; @@ -189,14 +188,23 @@ public class SSLReceiver implements Receiver<ByteBuffer> catch(SSLException e) { log.error(e, "Error caught in SSLReceiver"); - sender.setErrorFlag(); - synchronized(notificationToken) + _sslStatus.setSslErrorFlag(); + synchronized(_sslStatus.getSslLock()) { - notificationToken.notifyAll(); + _sslStatus.getSslLock().notifyAll(); } exception(new TransportException("Error in SSLReceiver",e)); } } } + + private void doTasks() + { + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + runnable.run(); + } + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java index cd47a11825..5e0ee93cb8 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java @@ -31,30 +31,32 @@ import javax.net.ssl.SSLEngineResult.Status; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; import org.apache.qpid.transport.util.Logger; public class SSLSender implements Sender<ByteBuffer> { - private Sender<ByteBuffer> delegate; - private SSLEngine engine; - private int sslBufSize; - private ByteBuffer netData; - private long timeout = 30000; + private static final Logger log = Logger.get(SSLSender.class); + + private final Sender<ByteBuffer> delegate; + private final SSLEngine engine; + private final int sslBufSize; + private final ByteBuffer netData; + private final long timeout; + private final SSLStatus _sslStatus; private ConnectionSettings settings; - private final Object engineState = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - private final AtomicBoolean error = new AtomicBoolean(false); - private static final Logger log = Logger.get(SSLSender.class); - public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate) + public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) { this.engine = engine; this.delegate = delegate; sslBufSize = engine.getSession().getPacketBufferSize(); netData = ByteBuffer.allocate(sslBufSize); timeout = Long.getLong("qpid.ssl_timeout", 60000); + _sslStatus = sslStatus; } public void setConnectionSettings(ConnectionSettings settings) @@ -83,13 +85,13 @@ public class SSLSender implements Sender<ByteBuffer> } - synchronized(engineState) + synchronized(_sslStatus.getSslLock()) { while (!engine.isOutboundDone()) { try { - engineState.wait(); + _sslStatus.getSslLock().wait(); } catch(InterruptedException e) { @@ -148,7 +150,7 @@ public class SSLSender implements Sender<ByteBuffer> HandshakeStatus handshakeStatus; Status status; - while(appData.hasRemaining() && !error.get()) + while(appData.hasRemaining() && !_sslStatus.getSslErrorFlag()) { int read = 0; try @@ -160,6 +162,7 @@ public class SSLSender implements Sender<ByteBuffer> } catch(SSLException e) { + // Should this set _sslError?? throw new SenderException("SSL, Error occurred while encrypting data",e); } @@ -207,7 +210,7 @@ public class SSLSender implements Sender<ByteBuffer> case NEED_UNWRAP: flush(); - synchronized(engineState) + synchronized(_sslStatus.getSslLock()) { switch (engine.getHandshakeStatus()) { @@ -215,7 +218,7 @@ public class SSLSender implements Sender<ByteBuffer> long start = System.currentTimeMillis(); try { - engineState.wait(timeout); + _sslStatus.getSslLock().wait(timeout); } catch(InterruptedException e) { @@ -249,7 +252,7 @@ public class SSLSender implements Sender<ByteBuffer> } } - public void doTasks() + private void doTasks() { Runnable runnable; while ((runnable = engine.getDelegatedTask()) != null) { @@ -257,16 +260,6 @@ public class SSLSender implements Sender<ByteBuffer> } } - public Object getNotificationToken() - { - return engineState; - } - - public void setErrorFlag() - { - error.set(true); - } - public void setIdleTimeout(int i) { delegate.setIdleTimeout(i); diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index 796a845593..773d7bc117 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -29,6 +29,7 @@ import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.security.SSLStatus; import org.apache.qpid.transport.network.security.ssl.SSLReceiver; import org.apache.qpid.transport.network.security.ssl.SSLSender; import org.apache.qpid.transport.util.Logger; @@ -119,9 +120,10 @@ public final class IoTransport<E> } IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); ios.initiate(); - this.sender = new SSLSender(engine,ios); + final SSLStatus sslStatus = new SSLStatus(); + this.sender = new SSLSender(engine,ios, sslStatus); this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), + this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),sslStatus), 2*readBufferSize, timeout); this.receiver.initiate(); ios.registerCloseListener(this.receiver); |
