diff options
Diffstat (limited to 'java')
5 files changed, 469 insertions, 52 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java index 950279fff1..e142d21e06 100644 --- a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java +++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java @@ -41,38 +41,74 @@ public class SSLContextFactory { /** * Path to the Java keystore file */ - private String _keystorePath; + private String _keyStorePath; /** * Password for the keystore */ - private String _keystorePassword; + private String _keyStorePassword; /** - * Cert type to use + * Cert type to use in keystore */ - private String _certType; + private String _keyStoreCertType; /** + * Path to the Java truststore file + */ + private String _trustStorePath; + + /** + * Password for the truststore + */ + private String _trustStorePassword; + + /** + * Cert type to use in truststore + */ + private String _trustStoreCertType; + + + + public SSLContextFactory(String trustStorePath, String trustStorePassword, + String trustStoreCertType) + { + this(trustStorePath,trustStorePassword,trustStoreCertType, + trustStorePath,trustStorePassword,trustStoreCertType); + } + + /** * Create a factory instance * @param keystorePath path to the Java keystore file * @param keystorePassword password for the Java keystore * @param certType certificate type */ - public SSLContextFactory(String keystorePath, String keystorePassword, - String certType) + public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType, + String keyStorePath, String keyStorePassword, String keyStoreCertType) { - _keystorePath = keystorePath; - _keystorePassword = keystorePassword; - if (_keystorePassword.equals("none")) + + _trustStorePath = trustStorePath; + _trustStorePassword = trustStorePassword; + + if (_trustStorePassword.equals("none")) + { + _trustStorePassword = null; + } + _trustStoreCertType = trustStoreCertType; + + _keyStorePath = keyStorePath; + _keyStorePassword = keyStorePassword; + + if (_keyStorePassword.equals("none")) { - _keystorePassword = null; + _keyStorePassword = null; } - _certType = certType; - if (keystorePath == null) { - throw new IllegalArgumentException("Keystore path must be specified"); + _keyStoreCertType = keyStoreCertType; + + if (_trustStorePath == null) { + throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified"); } - if (certType == null) { + if (_trustStoreCertType == null) { throw new IllegalArgumentException("Cert type must be specified"); } } @@ -86,16 +122,18 @@ public class SSLContextFactory { public SSLContext buildServerContext() throws GeneralSecurityException, IOException { // Create keystore - KeyStore ks = getInitializedKeyStore(); + KeyStore ks = getInitializedKeyStore(_keyStorePath,_keyStorePassword); // Set up key manager factory to use our key store - KeyManagerFactory kmf = KeyManagerFactory.getInstance(_certType); - kmf.init(ks, _keystorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType); + kmf.init(ks, _keyStorePassword.toCharArray()); + KeyStore ts = getInitializedKeyStore(_trustStorePath,_trustStorePassword); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType); + tmf.init(ts); + // Initialize the SSLContext to work with our key managers. - SSLContext sslContext = SSLContext.getInstance("TLS"); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType); - tmf.init(ks); + SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return sslContext; @@ -109,34 +147,34 @@ public class SSLContextFactory { */ public SSLContext buildClientContext() throws GeneralSecurityException, IOException { - KeyStore ks = getInitializedKeyStore(); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType); + KeyStore ks = getInitializedKeyStore(_trustStorePath,_trustStorePassword); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType); tmf.init(ks); SSLContext context = SSLContext.getInstance("TLS"); context.init(null, tmf.getTrustManagers(), null); return context; } - private KeyStore getInitializedKeyStore() throws GeneralSecurityException, IOException + private KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException { KeyStore ks = KeyStore.getInstance("JKS"); InputStream in = null; try { - File f = new File(_keystorePath); + File f = new File(storePath); if (f.exists()) { in = new FileInputStream(f); } else { - in = Thread.currentThread().getContextClassLoader().getResourceAsStream(_keystorePath); + in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath); } if (in == null) { - throw new IOException("Unable to load keystore resource: " + _keystorePath); + throw new IOException("Unable to load keystore resource: " + storePath); } - ks.load(in, _keystorePassword.toCharArray()); + ks.load(in, storePassword.toCharArray()); } finally { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java index c4559ae6b4..c3ec03a624 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -69,7 +69,7 @@ public class IoAcceptor<E> extends Thread try { Socket sock = socket.accept(); - IoTransport<E> transport = new IoTransport<E>(sock, binding); + IoTransport<E> transport = new IoTransport<E>(sock, binding,false); } catch (IOException e) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 8d52c3269a..3615461e9f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -26,19 +26,20 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; -import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.ssl.SSLReceiver; +import org.apache.qpid.transport.network.ssl.SSLSender; import org.apache.qpid.transport.util.Logger; /** @@ -70,21 +71,53 @@ public final class IoTransport<E> ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); private Socket socket; - private IoSender sender; + private Sender<ByteBuffer> sender; private E endpoint; private IoReceiver receiver; private long timeout = 60000; - IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl) { this.socket = socket; - this.sender = new IoSender(this, 2*writeBufferSize, timeout); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, binding.receiver(endpoint), - 2*readBufferSize, timeout); + + if (ssl) + { + SSLEngine engine = null; + SSLContext sslCtx; + try + { + sslCtx = createSSLContext(); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + try + { + engine = sslCtx.createSSLEngine(); + engine.setUseClientMode(true); + } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + + this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout)); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), + 2*readBufferSize, timeout); + } + else + { + this.sender = new IoSender(this, 2*writeBufferSize, timeout); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, binding.receiver(endpoint), + 2*readBufferSize, timeout); + } } - IoSender getSender() + Sender<ByteBuffer> getSender() { return sender; } @@ -103,8 +136,8 @@ public final class IoTransport<E> Binding<E,ByteBuffer> binding, boolean ssl) { - Socket socket = createSocket(host, port,ssl); - IoTransport<E> transport = new IoTransport<E>(socket, binding); + Socket socket = createSocket(host, port); + IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl); return transport.endpoint; } @@ -144,21 +177,12 @@ public final class IoTransport<E> } - private static Socket createSocket(String host, int port, boolean ssl) + private static Socket createSocket(String host, int port) { try { InetAddress address = InetAddress.getByName(host); - Socket socket; - if (ssl) - { - SSLSocketFactory sslSocketfactory = (SSLSocketFactory) SSLSocketFactory.getDefault(); - socket = sslSocketfactory.createSocket(); - } - else - { - socket = new Socket(); - } + Socket socket = new Socket(); socket.setReuseAddress(true); socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -183,5 +207,23 @@ public final class IoTransport<E> throw new TransportException("Error connecting to broker", e); } } + + private SSLContext createSSLContext() throws Exception + { + String trustStorePath = System.getProperty("javax.net.ssl.trustStore"); + String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); + String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509"); + + String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath); + String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword); + String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509"); + + SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword, + trustStoreCertType,keyStorePath, + keyStorePassword,keyStoreCertType); + + return sslContextFactory.buildServerContext(); + + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java new file mode 100644 index 0000000000..58a340b9d6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java @@ -0,0 +1,159 @@ +package org.apache.qpid.transport.network.ssl; + +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + +public class SSLReceiver implements Receiver<ByteBuffer> +{ + private Receiver<ByteBuffer> delegate; + private SSLEngine engine; + private SSLSender sender; + private int sslBufSize; + private ByteBuffer appData; + private ByteBuffer localBuffer; + private boolean dataCached = false; + private final Object notificationToken; + + private static final Logger log = Logger.get(SSLReceiver.class); + + public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender) + { + this.engine = engine; + this.delegate = delegate; + this.sender = sender; + this.sslBufSize = engine.getSession().getApplicationBufferSize(); + appData = ByteBuffer.allocate(sslBufSize); + localBuffer = ByteBuffer.allocate(sslBufSize); + notificationToken = sender.getNotificationToken(); + } + + public void closed() + { + delegate.closed(); + } + + public void exception(Throwable t) + { + delegate.exception(t); + } + + private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf) + { + if (dataCached) + { + ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining()); + b.put(localBuffer); + b.put(buf); + b.flip(); + dataCached = false; + return b; + } + else + { + return buf; + } + } + + public void received(ByteBuffer buf) + { + ByteBuffer netData = addPreviouslyUnreadData(buf); + + HandshakeStatus handshakeStatus; + Status status; + + while (netData.hasRemaining()) + { + try + { + SSLEngineResult result = engine.unwrap(netData, appData); + int read = result.bytesProduced(); + status = result.getStatus(); + handshakeStatus = result.getHandshakeStatus(); + + if (read > 0) + { + int limit = appData.limit(); + appData.limit(appData.position()); + appData.position(appData.position() - read); + + ByteBuffer data = appData.slice(); + + appData.limit(limit); + appData.position(appData.position() + read); + + delegate.received(data); + } + + + switch(status) + { + case CLOSED: + synchronized(notificationToken) + { + notificationToken.notifyAll(); + } + return; + + case BUFFER_OVERFLOW: + appData = ByteBuffer.allocate(sslBufSize); + continue; + + case BUFFER_UNDERFLOW: + localBuffer.clear(); + localBuffer.put(netData); + localBuffer.flip(); + dataCached = true; + break; + + case OK: + break; // do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + switch (handshakeStatus) + { + case NEED_UNWRAP: + if (netData.hasRemaining()) + { + continue; + } + break; + + case NEED_TASK: + sender.doTasks(); + handshakeStatus = engine.getHandshakeStatus(); + + case NEED_WRAP: + case FINISHED: + case NOT_HANDSHAKING: + synchronized(notificationToken) + { + notificationToken.notifyAll(); + } + break; + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + + } + catch(SSLException e) + { + throw new TransportException("Error in SSLReceiver",e); + } + + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java new file mode 100644 index 0000000000..a6e245eb49 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java @@ -0,0 +1,178 @@ +package org.apache.qpid.transport.network.ssl; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; + +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SSLSender implements Sender<ByteBuffer> +{ + private Sender<ByteBuffer> delegate; + private SSLEngine engine; + private int sslBufSize; + private ByteBuffer netData; + + private final Object engineState = new Object(); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private static final Logger log = Logger.get(SSLSender.class); + + public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate) + { + this.engine = engine; + this.delegate = delegate; + sslBufSize = engine.getSession().getPacketBufferSize(); + netData = ByteBuffer.allocate(sslBufSize); + } + + public void close() + { + if (!closed.getAndSet(true)) + { + if (engine.isOutboundDone()) + { + return; + } + log.debug("Closing SSL connection"); + engine.closeOutbound(); + send(ByteBuffer.allocate(0)); + flush(); + while (!engine.isOutboundDone()) + { + synchronized(engineState) + { + try + { + engineState.wait(); + } + catch(InterruptedException e) + { + // pass + } + } + } + delegate.close(); + } + } + + public void flush() + { + delegate.flush(); + } + + public void send(ByteBuffer appData) + { + if (closed.get()) + { + throw new SenderException("SSL Sender is closed"); + } + + HandshakeStatus handshakeStatus; + Status status; + + while(appData.hasRemaining()) + { + + int read = 0; + try + { + SSLEngineResult result = engine.wrap(appData, netData); + read = result.bytesProduced(); + status = result.getStatus(); + handshakeStatus = result.getHandshakeStatus(); + + } + catch(SSLException e) + { + throw new SenderException("SSL, Error occurred while encrypting data",e); + } + + if(read > 0) + { + int limit = netData.limit(); + netData.limit(netData.position()); + netData.position(netData.position() - read); + + ByteBuffer data = netData.slice(); + + netData.limit(limit); + netData.position(netData.position() + read); + + delegate.send(data); + } + + switch(status) + { + case CLOSED: + throw new SenderException("SSLEngine is closed"); + + case BUFFER_OVERFLOW: + netData.clear(); + continue; + + case OK: + break; // do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + switch (handshakeStatus) + { + case NEED_WRAP: + if (netData.hasRemaining()) + { + continue; + } + + case NEED_TASK: + doTasks(); + break; + + case NEED_UNWRAP: + flush(); + synchronized(engineState) + { + try + { + engineState.wait(); + } + catch(InterruptedException e) + { + // pass + } + } + break; + + case FINISHED: + case NOT_HANDSHAKING: + break; //do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + } + } + + public void doTasks() + { + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + runnable.run(); + } + } + + public Object getNotificationToken() + { + return engineState; + } +} |
