diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-11-04 22:40:06 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-11-04 22:40:06 +0000 |
| commit | 0775a64f49729537a207c540b8d8a2973c155d56 (patch) | |
| tree | 39780b49b8ecc85fb2bb25e491cc1f11c53448a7 | |
| parent | d93dbf035ab08a71165d9ca508fdc286d16b7b72 (diff) | |
| download | qpid-python-0775a64f49729537a207c540b8d8a2973c155d56.tar.gz | |
This check in is related to QPID-1296
Since SSLSocket didn't support shutdownInput method, I added an SSL layer on top of the TCP transport.
The SSL layer uses the SSLEngine class introduced in java 1.5.
This Layer can be used with minimum code changes to the the nio transport as well.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711455 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 469 insertions, 52 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java index 950279fff1..e142d21e06 100644 --- a/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java +++ b/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java @@ -41,38 +41,74 @@ public class SSLContextFactory { /** * Path to the Java keystore file */ - private String _keystorePath; + private String _keyStorePath; /** * Password for the keystore */ - private String _keystorePassword; + private String _keyStorePassword; /** - * Cert type to use + * Cert type to use in keystore */ - private String _certType; + private String _keyStoreCertType; /** + * Path to the Java truststore file + */ + private String _trustStorePath; + + /** + * Password for the truststore + */ + private String _trustStorePassword; + + /** + * Cert type to use in truststore + */ + private String _trustStoreCertType; + + + + public SSLContextFactory(String trustStorePath, String trustStorePassword, + String trustStoreCertType) + { + this(trustStorePath,trustStorePassword,trustStoreCertType, + trustStorePath,trustStorePassword,trustStoreCertType); + } + + /** * Create a factory instance * @param keystorePath path to the Java keystore file * @param keystorePassword password for the Java keystore * @param certType certificate type */ - public SSLContextFactory(String keystorePath, String keystorePassword, - String certType) + public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType, + String keyStorePath, String keyStorePassword, String keyStoreCertType) { - _keystorePath = keystorePath; - _keystorePassword = keystorePassword; - if (_keystorePassword.equals("none")) + + _trustStorePath = trustStorePath; + _trustStorePassword = trustStorePassword; + + if (_trustStorePassword.equals("none")) + { + _trustStorePassword = null; + } + _trustStoreCertType = trustStoreCertType; + + _keyStorePath = keyStorePath; + _keyStorePassword = keyStorePassword; + + if (_keyStorePassword.equals("none")) { - _keystorePassword = null; + _keyStorePassword = null; } - _certType = certType; - if (keystorePath == null) { - throw new IllegalArgumentException("Keystore path must be specified"); + _keyStoreCertType = keyStoreCertType; + + if (_trustStorePath == null) { + throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified"); } - if (certType == null) { + if (_trustStoreCertType == null) { throw new IllegalArgumentException("Cert type must be specified"); } } @@ -86,16 +122,18 @@ public class SSLContextFactory { public SSLContext buildServerContext() throws GeneralSecurityException, IOException { // Create keystore - KeyStore ks = getInitializedKeyStore(); + KeyStore ks = getInitializedKeyStore(_keyStorePath,_keyStorePassword); // Set up key manager factory to use our key store - KeyManagerFactory kmf = KeyManagerFactory.getInstance(_certType); - kmf.init(ks, _keystorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType); + kmf.init(ks, _keyStorePassword.toCharArray()); + KeyStore ts = getInitializedKeyStore(_trustStorePath,_trustStorePassword); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType); + tmf.init(ts); + // Initialize the SSLContext to work with our key managers. - SSLContext sslContext = SSLContext.getInstance("TLS"); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType); - tmf.init(ks); + SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return sslContext; @@ -109,34 +147,34 @@ public class SSLContextFactory { */ public SSLContext buildClientContext() throws GeneralSecurityException, IOException { - KeyStore ks = getInitializedKeyStore(); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(_certType); + KeyStore ks = getInitializedKeyStore(_trustStorePath,_trustStorePassword); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType); tmf.init(ks); SSLContext context = SSLContext.getInstance("TLS"); context.init(null, tmf.getTrustManagers(), null); return context; } - private KeyStore getInitializedKeyStore() throws GeneralSecurityException, IOException + private KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException { KeyStore ks = KeyStore.getInstance("JKS"); InputStream in = null; try { - File f = new File(_keystorePath); + File f = new File(storePath); if (f.exists()) { in = new FileInputStream(f); } else { - in = Thread.currentThread().getContextClassLoader().getResourceAsStream(_keystorePath); + in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath); } if (in == null) { - throw new IOException("Unable to load keystore resource: " + _keystorePath); + throw new IOException("Unable to load keystore resource: " + storePath); } - ks.load(in, _keystorePassword.toCharArray()); + ks.load(in, storePassword.toCharArray()); } finally { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java index c4559ae6b4..c3ec03a624 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -69,7 +69,7 @@ public class IoAcceptor<E> extends Thread try { Socket sock = socket.accept(); - IoTransport<E> transport = new IoTransport<E>(sock, binding); + IoTransport<E> transport = new IoTransport<E>(sock, binding,false); } catch (IOException e) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 8d52c3269a..3615461e9f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -26,19 +26,20 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; -import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.ssl.SSLReceiver; +import org.apache.qpid.transport.network.ssl.SSLSender; import org.apache.qpid.transport.util.Logger; /** @@ -70,21 +71,53 @@ public final class IoTransport<E> ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); private Socket socket; - private IoSender sender; + private Sender<ByteBuffer> sender; private E endpoint; private IoReceiver receiver; private long timeout = 60000; - IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl) { this.socket = socket; - this.sender = new IoSender(this, 2*writeBufferSize, timeout); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, binding.receiver(endpoint), - 2*readBufferSize, timeout); + + if (ssl) + { + SSLEngine engine = null; + SSLContext sslCtx; + try + { + sslCtx = createSSLContext(); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + try + { + engine = sslCtx.createSSLEngine(); + engine.setUseClientMode(true); + } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + + this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout)); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), + 2*readBufferSize, timeout); + } + else + { + this.sender = new IoSender(this, 2*writeBufferSize, timeout); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, binding.receiver(endpoint), + 2*readBufferSize, timeout); + } } - IoSender getSender() + Sender<ByteBuffer> getSender() { return sender; } @@ -103,8 +136,8 @@ public final class IoTransport<E> Binding<E,ByteBuffer> binding, boolean ssl) { - Socket socket = createSocket(host, port,ssl); - IoTransport<E> transport = new IoTransport<E>(socket, binding); + Socket socket = createSocket(host, port); + IoTransport<E> transport = new IoTransport<E>(socket, binding,ssl); return transport.endpoint; } @@ -144,21 +177,12 @@ public final class IoTransport<E> } - private static Socket createSocket(String host, int port, boolean ssl) + private static Socket createSocket(String host, int port) { try { InetAddress address = InetAddress.getByName(host); - Socket socket; - if (ssl) - { - SSLSocketFactory sslSocketfactory = (SSLSocketFactory) SSLSocketFactory.getDefault(); - socket = sslSocketfactory.createSocket(); - } - else - { - socket = new Socket(); - } + Socket socket = new Socket(); socket.setReuseAddress(true); socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -183,5 +207,23 @@ public final class IoTransport<E> throw new TransportException("Error connecting to broker", e); } } + + private SSLContext createSSLContext() throws Exception + { + String trustStorePath = System.getProperty("javax.net.ssl.trustStore"); + String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword"); + String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509"); + + String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath); + String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword); + String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509"); + + SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword, + trustStoreCertType,keyStorePath, + keyStorePassword,keyStoreCertType); + + return sslContextFactory.buildServerContext(); + + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java new file mode 100644 index 0000000000..58a340b9d6 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLReceiver.java @@ -0,0 +1,159 @@ +package org.apache.qpid.transport.network.ssl; + +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + +public class SSLReceiver implements Receiver<ByteBuffer> +{ + private Receiver<ByteBuffer> delegate; + private SSLEngine engine; + private SSLSender sender; + private int sslBufSize; + private ByteBuffer appData; + private ByteBuffer localBuffer; + private boolean dataCached = false; + private final Object notificationToken; + + private static final Logger log = Logger.get(SSLReceiver.class); + + public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender) + { + this.engine = engine; + this.delegate = delegate; + this.sender = sender; + this.sslBufSize = engine.getSession().getApplicationBufferSize(); + appData = ByteBuffer.allocate(sslBufSize); + localBuffer = ByteBuffer.allocate(sslBufSize); + notificationToken = sender.getNotificationToken(); + } + + public void closed() + { + delegate.closed(); + } + + public void exception(Throwable t) + { + delegate.exception(t); + } + + private ByteBuffer addPreviouslyUnreadData(ByteBuffer buf) + { + if (dataCached) + { + ByteBuffer b = ByteBuffer.allocate(localBuffer.remaining() + buf.remaining()); + b.put(localBuffer); + b.put(buf); + b.flip(); + dataCached = false; + return b; + } + else + { + return buf; + } + } + + public void received(ByteBuffer buf) + { + ByteBuffer netData = addPreviouslyUnreadData(buf); + + HandshakeStatus handshakeStatus; + Status status; + + while (netData.hasRemaining()) + { + try + { + SSLEngineResult result = engine.unwrap(netData, appData); + int read = result.bytesProduced(); + status = result.getStatus(); + handshakeStatus = result.getHandshakeStatus(); + + if (read > 0) + { + int limit = appData.limit(); + appData.limit(appData.position()); + appData.position(appData.position() - read); + + ByteBuffer data = appData.slice(); + + appData.limit(limit); + appData.position(appData.position() + read); + + delegate.received(data); + } + + + switch(status) + { + case CLOSED: + synchronized(notificationToken) + { + notificationToken.notifyAll(); + } + return; + + case BUFFER_OVERFLOW: + appData = ByteBuffer.allocate(sslBufSize); + continue; + + case BUFFER_UNDERFLOW: + localBuffer.clear(); + localBuffer.put(netData); + localBuffer.flip(); + dataCached = true; + break; + + case OK: + break; // do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + switch (handshakeStatus) + { + case NEED_UNWRAP: + if (netData.hasRemaining()) + { + continue; + } + break; + + case NEED_TASK: + sender.doTasks(); + handshakeStatus = engine.getHandshakeStatus(); + + case NEED_WRAP: + case FINISHED: + case NOT_HANDSHAKING: + synchronized(notificationToken) + { + notificationToken.notifyAll(); + } + break; + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + + } + catch(SSLException e) + { + throw new TransportException("Error in SSLReceiver",e); + } + + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java new file mode 100644 index 0000000000..a6e245eb49 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java @@ -0,0 +1,178 @@ +package org.apache.qpid.transport.network.ssl; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; + +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + +public class SSLSender implements Sender<ByteBuffer> +{ + private Sender<ByteBuffer> delegate; + private SSLEngine engine; + private int sslBufSize; + private ByteBuffer netData; + + private final Object engineState = new Object(); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private static final Logger log = Logger.get(SSLSender.class); + + public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate) + { + this.engine = engine; + this.delegate = delegate; + sslBufSize = engine.getSession().getPacketBufferSize(); + netData = ByteBuffer.allocate(sslBufSize); + } + + public void close() + { + if (!closed.getAndSet(true)) + { + if (engine.isOutboundDone()) + { + return; + } + log.debug("Closing SSL connection"); + engine.closeOutbound(); + send(ByteBuffer.allocate(0)); + flush(); + while (!engine.isOutboundDone()) + { + synchronized(engineState) + { + try + { + engineState.wait(); + } + catch(InterruptedException e) + { + // pass + } + } + } + delegate.close(); + } + } + + public void flush() + { + delegate.flush(); + } + + public void send(ByteBuffer appData) + { + if (closed.get()) + { + throw new SenderException("SSL Sender is closed"); + } + + HandshakeStatus handshakeStatus; + Status status; + + while(appData.hasRemaining()) + { + + int read = 0; + try + { + SSLEngineResult result = engine.wrap(appData, netData); + read = result.bytesProduced(); + status = result.getStatus(); + handshakeStatus = result.getHandshakeStatus(); + + } + catch(SSLException e) + { + throw new SenderException("SSL, Error occurred while encrypting data",e); + } + + if(read > 0) + { + int limit = netData.limit(); + netData.limit(netData.position()); + netData.position(netData.position() - read); + + ByteBuffer data = netData.slice(); + + netData.limit(limit); + netData.position(netData.position() + read); + + delegate.send(data); + } + + switch(status) + { + case CLOSED: + throw new SenderException("SSLEngine is closed"); + + case BUFFER_OVERFLOW: + netData.clear(); + continue; + + case OK: + break; // do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + switch (handshakeStatus) + { + case NEED_WRAP: + if (netData.hasRemaining()) + { + continue; + } + + case NEED_TASK: + doTasks(); + break; + + case NEED_UNWRAP: + flush(); + synchronized(engineState) + { + try + { + engineState.wait(); + } + catch(InterruptedException e) + { + // pass + } + } + break; + + case FINISHED: + case NOT_HANDSHAKING: + break; //do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + } + } + + public void doTasks() + { + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + runnable.run(); + } + } + + public Object getNotificationToken() + { + return engineState; + } +} |
