summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java92
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java90
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java159
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java178
5 files changed, 469 insertions, 52 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
index 950279fff1..e142d21e06 100644
--- a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
@@ -41,38 +41,74 @@ public class SSLContextFactory {
/**
* Path to the Java keystore file
*/
- private String _keystorePath;
+ private String _keyStorePath;
/**
* Password for the keystore
*/
- private String _keystorePassword;
+ private String _keyStorePassword;
/**
- * Cert type to use
+ * Cert type to use in keystore
*/
- private String _certType;
+ private String _keyStoreCertType;
/**
+ * Path to the Java truststore file
+ */
+ private String _trustStorePath;
+
+ /**
+ * Password for the truststore
+ */
+ private String _trustStorePassword;
+
+ /**
+ * Cert type to use in truststore
+ */
+ private String _trustStoreCertType;
+
+
+
+ public SSLContextFactory(String trustStorePath, String trustStorePassword,
+ String trustStoreCertType)
+ {
+ this(trustStorePath,trustStorePassword,trustStoreCertType,
+ trustStorePath,trustStorePassword,trustStoreCertType);
+ }
+
+ /**
* Create a factory instance
* @param keystorePath path to the Java keystore file
* @param keystorePassword password for the Java keystore
* @param certType certificate type
*/
- public SSLContextFactory(String keystorePath, String keystorePassword,
- String certType)
+ public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
+ String keyStorePath, String keyStorePassword, String keyStoreCertType)
{
- _keystorePath = keystorePath;
- _keystorePassword = keystorePassword;
- if (_keystorePassword.equals("none"))
+
+ _trustStorePath = trustStorePath;
+ _trustStorePassword = trustStorePassword;
+
+ if (_trustStorePassword.equals("none"))
+ {
+ _trustStorePassword = null;
+ }
+ _trustStoreCertType = trustStoreCertType;
+
+ _keyStorePath = keyStorePath;
+ _keyStorePassword = keyStorePassword;
+
+ if (_keyStorePassword.equals("none"))
{
- _keystorePassword = null;
+ _keyStorePassword = null;
}
- _certType = certType;
- if (keystorePath == null) {
- throw new IllegalArgumentException("Keystore path must be specified");
+ _keyStoreCertType = keyStoreCertType;
+
+ if (_trustStorePath == null) {
+ throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
}
- if (certType == null) {
+ if (_trustStoreCertType == null) {
throw new IllegalArgumentException("Cert type must be specified");
}
}
@@ -86,16 +122,18 @@ public class SSLContextFactory {
public SSLContext buildServerContext() throws GeneralSecurityException, IOException
{
// Create keystore
- KeyStore ks = getInitializedKeyStore();
+ KeyStore ks = getInitializedKeyStore(_keyStorePath,_keyStorePassword);
// Set up key manager factory to use our key store
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(_certType);
- kmf.init(ks, _keystorePassword.toCharArray());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
+ kmf.init(ks, _keyStorePassword.toCharArray());
+ KeyStore ts = getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
+ tmf.init(ts);
+
// Initialize the SSLContext to work with our key managers.
- SSLContext sslContext = SSLContext.getInstance("TLS");
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType);
- tmf.init(ks);
+ SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return sslContext;
@@ -109,34 +147,34 @@ public class SSLContextFactory {
*/
public SSLContext buildClientContext() throws GeneralSecurityException, IOException
{
- KeyStore ks = getInitializedKeyStore();
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType);
+ KeyStore ks = getInitializedKeyStore(_trustStorePath,_trustStorePassword);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
tmf.init(ks);
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null);
return context;
}
- private KeyStore getInitializedKeyStore() throws GeneralSecurityException, IOException
+ private KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException
{
KeyStore ks = KeyStore.getInstance("JKS");
InputStream in = null;
try
{
- File f = new File(_keystorePath);
+ File f = new File(storePath);
if (f.exists())
{
in = new FileInputStream(f);
}
else
{
- in = Thread.currentThread().getContextClassLoader().getResourceAsStream(_keystorePath);
+ in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath);
}
if (in == null)
{
- throw new IOException("Unable to load keystore resource: " + _keystorePath);
+ throw new IOException("Unable to load keystore resource: " + storePath);
}
- ks.load(in, _keystorePassword.toCharArray());
+ ks.load(in, storePassword.toCharArray());
}
finally
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
index c4559ae6b4..c3ec03a624 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
@@ -69,7 +69,7 @@ public class IoAcceptor<E> extends Thread
try
{
Socket sock = socket.accept();
- IoTransport<E> transport = new IoTransport<E>(sock, binding);
+ IoTransport<E> transport = new IoTransport<E>(sock, binding,false);
}
catch (IOException e)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 8d52c3269a..3615461e9f 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -26,19 +26,20 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.ssl.SSLReceiver;
+import org.apache.qpid.transport.network.ssl.SSLSender;
import org.apache.qpid.transport.util.Logger;
/**
@@ -70,21 +71,53 @@ public final class IoTransport<E>
("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
private Socket socket;
- private IoSender sender;
+ private Sender<ByteBuffer> sender;
private E endpoint;
private IoReceiver receiver;
private long timeout = 60000;
- IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+ IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
{
this.socket = socket;
- this.sender = new IoSender(this, 2*writeBufferSize, timeout);
- this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(this, binding.receiver(endpoint),
- 2*readBufferSize, timeout);
+
+ if (ssl)
+ {
+ SSLEngine engine = null;
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = createSSLContext();
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
+ }
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+
+ this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+ 2*readBufferSize, timeout);
+ }
+ else
+ {
+ this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ }
}
- IoSender getSender()
+ Sender<ByteBuffer> getSender()
{
return sender;
}
@@ -103,8 +136,8 @@ public final class IoTransport<E>
Binding<E,ByteBuffer> binding,
boolean ssl)
{
- Socket socket = createSocket(host, port,ssl);
- IoTransport<E> transport = new IoTransport<E>(socket, binding);
+ Socket socket = createSocket(host, port);
+ IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl);
return transport.endpoint;
}
@@ -144,21 +177,12 @@ public final class IoTransport<E>
}
- private static Socket createSocket(String host, int port, boolean ssl)
+ private static Socket createSocket(String host, int port)
{
try
{
InetAddress address = InetAddress.getByName(host);
- Socket socket;
- if (ssl)
- {
- SSLSocketFactory sslSocketfactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
- socket = sslSocketfactory.createSocket();
- }
- else
- {
- socket = new Socket();
- }
+ Socket socket = new Socket();
socket.setReuseAddress(true);
socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
@@ -183,5 +207,23 @@ public final class IoTransport<E>
throw new TransportException("Error connecting to broker", e);
}
}
+
+ private SSLContext createSSLContext() throws Exception
+ {
+ String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
+ String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
+ String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
+
+ String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
+ String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
+ String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
+
+ SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
+ trustStoreCertType,keyStorePath,
+ keyStorePassword,keyStoreCertType);
+
+ return sslContextFactory.buildServerContext();
+
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
new file mode 100644
index 0000000000..58a340b9d6
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java
@@ -0,0 +1,159 @@
+package org.apache.qpid.transport.network.ssl;
+
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLReceiver implements Receiver<ByteBuffer>
+{
+ private Receiver<ByteBuffer> delegate;
+ private SSLEngine engine;
+ private SSLSender sender;
+ private int sslBufSize;
+ private ByteBuffer appData;
+ private ByteBuffer localBuffer;
+ private boolean dataCached = false;
+ private final Object notificationToken;
+
+ private static final Logger log = Logger.get(SSLReceiver.class);
+
+ public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
+ {
+ this.engine = engine;
+ this.delegate = delegate;
+ this.sender = sender;
+ this.sslBufSize = engine.getSession().getApplicationBufferSize();
+ appData = ByteBuffer.allocate(sslBufSize);
+ localBuffer = ByteBuffer.allocate(sslBufSize);
+ notificationToken = sender.getNotificationToken();
+ }
+
+ public void closed()
+ {
+ delegate.closed();
+ }
+
+ public void exception(Throwable t)
+ {
+ delegate.exception(t);
+ }
+
+ private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf)
+ {
+ if (dataCached)
+ {
+ ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining());
+ b.put(localBuffer);
+ b.put(buf);
+ b.flip();
+ dataCached = false;
+ return b;
+ }
+ else
+ {
+ return buf;
+ }
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ ByteBuffer netData = addPreviouslyUnreadData(buf);
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while (netData.hasRemaining())
+ {
+ try
+ {
+ SSLEngineResult result = engine.unwrap(netData, appData);
+ int read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+
+ if (read > 0)
+ {
+ int limit = appData.limit();
+ appData.limit(appData.position());
+ appData.position(appData.position() - read);
+
+ ByteBuffer data = appData.slice();
+
+ appData.limit(limit);
+ appData.position(appData.position() + read);
+
+ delegate.received(data);
+ }
+
+
+ switch(status)
+ {
+ case CLOSED:
+ synchronized(notificationToken)
+ {
+ notificationToken.notifyAll();
+ }
+ return;
+
+ case BUFFER_OVERFLOW:
+ appData = ByteBuffer.allocate(sslBufSize);
+ continue;
+
+ case BUFFER_UNDERFLOW:
+ localBuffer.clear();
+ localBuffer.put(netData);
+ localBuffer.flip();
+ dataCached = true;
+ break;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_UNWRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+ break;
+
+ case NEED_TASK:
+ sender.doTasks();
+ handshakeStatus = engine.getHandshakeStatus();
+
+ case NEED_WRAP:
+ case FINISHED:
+ case NOT_HANDSHAKING:
+ synchronized(notificationToken)
+ {
+ notificationToken.notifyAll();
+ }
+ break;
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+
+ }
+ catch(SSLException e)
+ {
+ throw new TransportException("Error in SSLReceiver",e);
+ }
+
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
new file mode 100644
index 0000000000..a6e245eb49
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
@@ -0,0 +1,178 @@
+package org.apache.qpid.transport.network.ssl;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLSender implements Sender<ByteBuffer>
+{
+ private Sender<ByteBuffer> delegate;
+ private SSLEngine engine;
+ private int sslBufSize;
+ private ByteBuffer netData;
+
+ private final Object engineState = new Object();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private static final Logger log = Logger.get(SSLSender.class);
+
+ public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
+ {
+ this.engine = engine;
+ this.delegate = delegate;
+ sslBufSize = engine.getSession().getPacketBufferSize();
+ netData = ByteBuffer.allocate(sslBufSize);
+ }
+
+ public void close()
+ {
+ if (!closed.getAndSet(true))
+ {
+ if (engine.isOutboundDone())
+ {
+ return;
+ }
+ log.debug("Closing SSL connection");
+ engine.closeOutbound();
+ send(ByteBuffer.allocate(0));
+ flush();
+ while (!engine.isOutboundDone())
+ {
+ synchronized(engineState)
+ {
+ try
+ {
+ engineState.wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+ }
+ }
+ delegate.close();
+ }
+ }
+
+ public void flush()
+ {
+ delegate.flush();
+ }
+
+ public void send(ByteBuffer appData)
+ {
+ if (closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while(appData.hasRemaining())
+ {
+
+ int read = 0;
+ try
+ {
+ SSLEngineResult result = engine.wrap(appData, netData);
+ read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+
+ }
+ catch(SSLException e)
+ {
+ throw new SenderException("SSL, Error occurred while encrypting data",e);
+ }
+
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ delegate.send(data);
+ }
+
+ switch(status)
+ {
+ case CLOSED:
+ throw new SenderException("SSLEngine is closed");
+
+ case BUFFER_OVERFLOW:
+ netData.clear();
+ continue;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_WRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+
+ case NEED_TASK:
+ doTasks();
+ break;
+
+ case NEED_UNWRAP:
+ flush();
+ synchronized(engineState)
+ {
+ try
+ {
+ engineState.wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+ }
+ break;
+
+ case FINISHED:
+ case NOT_HANDSHAKING:
+ break; //do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ }
+ }
+
+ public void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ public Object getNotificationToken()
+ {
+ return engineState;
+ }
+}