summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-08-15 14:46:06 +0000
committerRobert Gemmell <robbie@apache.org>2011-08-15 14:46:06 +0000
commit67663fcc5f0d01f1df1a8cc006ec86725031f10a (patch)
tree8a18a7ec88a9b99ab6269af27465820769db26ed /java/common
parent034f02dac2f4645094633c77805b1e538e4d0583 (diff)
downloadqpid-python-67663fcc5f0d01f1df1a8cc006ec86725031f10a.tar.gz
QPID-3342: Regression when forming SSL connections.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1157866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java49
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java21
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java54
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java43
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java6
5 files changed, 109 insertions, 64 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java
new file mode 100644
index 0000000000..9db7dd557a
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SSLStatus.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.security;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SSLStatus
+{
+ private final Object _sslLock = new Object();
+ private final AtomicBoolean _sslErrorFlag = new AtomicBoolean(false);
+
+ /**
+ * Lock used to coordinate the SSL sender with the SSL receiver.
+ *
+ * @return lock
+ */
+ public Object getSslLock()
+ {
+ return _sslLock;
+ }
+
+ public boolean getSslErrorFlag()
+ {
+ return _sslErrorFlag.get();
+ }
+
+ public void setSslErrorFlag()
+ {
+ _sslErrorFlag.set(true);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 69e4b52edb..3479aaa42a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -106,8 +106,8 @@ public class SecurityLayer
class SSLSecurityLayer
{
- SSLEngine engine;
- SSLSender sender;
+ final SSLEngine _engine;
+ final SSLStatus _sslStatus = new SSLStatus();
public SSLSecurityLayer()
{
@@ -123,8 +123,8 @@ public class SecurityLayer
try
{
- engine = sslCtx.createSSLEngine();
- engine.setUseClientMode(true);
+ _engine = sslCtx.createSSLEngine();
+ _engine.setUseClientMode(true);
}
catch(Exception e)
{
@@ -134,28 +134,21 @@ public class SecurityLayer
public SSLSender sender(Sender<ByteBuffer> delegate)
{
- sender = new SSLSender(engine,delegate);
+ SSLSender sender = new SSLSender(_engine, delegate, _sslStatus);
sender.setConnectionSettings(settings);
return sender;
}
public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
{
- if (sender == null)
- {
- throw new
- IllegalStateException("SecurityLayer.sender method should be " +
- "invoked before SecurityLayer.receiver");
- }
-
- SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
+ SSLReceiver receiver = new SSLReceiver(_engine, delegate, _sslStatus);
receiver.setConnectionSettings(settings);
return receiver;
}
public String getUserID()
{
- return SSLUtil.retriveIdentity(engine);
+ return SSLUtil.retriveIdentity(_engine);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
index e227a51729..878f0b2352 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
@@ -24,38 +24,37 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
public class SSLReceiver implements Receiver<ByteBuffer>
{
- private Receiver<ByteBuffer> delegate;
- private SSLEngine engine;
- private SSLSender sender;
- private int sslBufSize;
+ private static final Logger log = Logger.get(SSLReceiver.class);
+
+ private final Receiver<ByteBuffer> delegate;
+ private final SSLEngine engine;
+ private final int sslBufSize;
+ private final ByteBuffer localBuffer;
+ private final SSLStatus _sslStatus;
+ private ConnectionSettings settings;
private ByteBuffer appData;
- private ByteBuffer localBuffer;
private boolean dataCached = false;
- private final Object notificationToken;
- private ConnectionSettings settings;
-
- private static final Logger log = Logger.get(SSLReceiver.class);
- public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
+ public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
- this.sender = sender;
this.sslBufSize = engine.getSession().getApplicationBufferSize();
appData = ByteBuffer.allocate(sslBufSize);
localBuffer = ByteBuffer.allocate(sslBufSize);
- notificationToken = sender.getNotificationToken();
+ _sslStatus = sslStatus;
}
public void setConnectionSettings(ConnectionSettings settings)
@@ -102,9 +101,9 @@ public class SSLReceiver implements Receiver<ByteBuffer>
try
{
SSLEngineResult result = engine.unwrap(netData, appData);
- synchronized (notificationToken)
+ synchronized (_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
int read = result.bytesProduced();
@@ -129,9 +128,9 @@ public class SSLReceiver implements Receiver<ByteBuffer>
switch(status)
{
case CLOSED:
- synchronized(notificationToken)
+ synchronized(_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
return;
@@ -163,7 +162,7 @@ public class SSLReceiver implements Receiver<ByteBuffer>
break;
case NEED_TASK:
- sender.doTasks();
+ doTasks();
handshakeStatus = engine.getHandshakeStatus();
case FINISHED:
@@ -174,9 +173,9 @@ public class SSLReceiver implements Receiver<ByteBuffer>
case NEED_WRAP:
case NOT_HANDSHAKING:
- synchronized(notificationToken)
+ synchronized(_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
break;
@@ -189,14 +188,23 @@ public class SSLReceiver implements Receiver<ByteBuffer>
catch(SSLException e)
{
log.error(e, "Error caught in SSLReceiver");
- sender.setErrorFlag();
- synchronized(notificationToken)
+ _sslStatus.setSslErrorFlag();
+ synchronized(_sslStatus.getSslLock())
{
- notificationToken.notifyAll();
+ _sslStatus.getSslLock().notifyAll();
}
exception(new TransportException("Error in SSLReceiver",e));
}
}
}
+
+ private void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index cd47a11825..5e0ee93cb8 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -31,30 +31,32 @@ import javax.net.ssl.SSLEngineResult.Status;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.util.Logger;
public class SSLSender implements Sender<ByteBuffer>
{
- private Sender<ByteBuffer> delegate;
- private SSLEngine engine;
- private int sslBufSize;
- private ByteBuffer netData;
- private long timeout = 30000;
+ private static final Logger log = Logger.get(SSLSender.class);
+
+ private final Sender<ByteBuffer> delegate;
+ private final SSLEngine engine;
+ private final int sslBufSize;
+ private final ByteBuffer netData;
+ private final long timeout;
+ private final SSLStatus _sslStatus;
private ConnectionSettings settings;
- private final Object engineState = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final AtomicBoolean error = new AtomicBoolean(false);
- private static final Logger log = Logger.get(SSLSender.class);
- public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
+ public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
{
this.engine = engine;
this.delegate = delegate;
sslBufSize = engine.getSession().getPacketBufferSize();
netData = ByteBuffer.allocate(sslBufSize);
timeout = Long.getLong("qpid.ssl_timeout", 60000);
+ _sslStatus = sslStatus;
}
public void setConnectionSettings(ConnectionSettings settings)
@@ -83,13 +85,13 @@ public class SSLSender implements Sender<ByteBuffer>
}
- synchronized(engineState)
+ synchronized(_sslStatus.getSslLock())
{
while (!engine.isOutboundDone())
{
try
{
- engineState.wait();
+ _sslStatus.getSslLock().wait();
}
catch(InterruptedException e)
{
@@ -148,7 +150,7 @@ public class SSLSender implements Sender<ByteBuffer>
HandshakeStatus handshakeStatus;
Status status;
- while(appData.hasRemaining() && !error.get())
+ while(appData.hasRemaining() && !_sslStatus.getSslErrorFlag())
{
int read = 0;
try
@@ -160,6 +162,7 @@ public class SSLSender implements Sender<ByteBuffer>
}
catch(SSLException e)
{
+ // Should this set _sslError??
throw new SenderException("SSL, Error occurred while encrypting data",e);
}
@@ -207,7 +210,7 @@ public class SSLSender implements Sender<ByteBuffer>
case NEED_UNWRAP:
flush();
- synchronized(engineState)
+ synchronized(_sslStatus.getSslLock())
{
switch (engine.getHandshakeStatus())
{
@@ -215,7 +218,7 @@ public class SSLSender implements Sender<ByteBuffer>
long start = System.currentTimeMillis();
try
{
- engineState.wait(timeout);
+ _sslStatus.getSslLock().wait(timeout);
}
catch(InterruptedException e)
{
@@ -249,7 +252,7 @@ public class SSLSender implements Sender<ByteBuffer>
}
}
- public void doTasks()
+ private void doTasks()
{
Runnable runnable;
while ((runnable = engine.getDelegatedTask()) != null) {
@@ -257,16 +260,6 @@ public class SSLSender implements Sender<ByteBuffer>
}
}
- public Object getNotificationToken()
- {
- return engineState;
- }
-
- public void setErrorFlag()
- {
- error.set(true);
- }
-
public void setIdleTimeout(int i)
{
delegate.setIdleTimeout(i);
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
index 796a845593..773d7bc117 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -29,6 +29,7 @@ import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
import org.apache.qpid.transport.network.security.ssl.SSLSender;
import org.apache.qpid.transport.util.Logger;
@@ -119,9 +120,10 @@ public final class IoTransport<E>
}
IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
ios.initiate();
- this.sender = new SSLSender(engine,ios);
+ final SSLStatus sslStatus = new SSLStatus();
+ this.sender = new SSLSender(engine,ios, sslStatus);
this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+ this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),sslStatus),
2*readBufferSize, timeout);
this.receiver.initiate();
ios.registerCloseListener(this.receiver);