diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-12-11 10:11:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-12-11 10:11:03 +0000 |
| commit | ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (patch) | |
| tree | bc693f79d08d4d97a0295446fc3f9e9a794d0dea /qpid/java/common/src/main | |
| parent | 5cce2b1fbd0d00486106d0cf9d734972f856ee6c (diff) | |
| download | qpid-python-ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa.tar.gz | |
Allow the transport to inform the model that encryption is being used
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644586 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
4 files changed, 33 insertions, 80 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 6774d0a45a..cad5461d83 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.protocol; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. @@ -56,7 +56,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) void readerIdle(); + void encryptedTransport(); public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java index e47e33f748..cfa4f48c19 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -27,8 +27,6 @@ import java.security.Principal; import java.util.Set; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,12 +57,14 @@ public class NonBlockingConnection implements NetworkConnection Ticker ticker, final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, - final boolean wantClientAuth, final boolean needClientAuth) + final boolean wantClientAuth, + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) { _socket = socket; _timeout = timeout; - _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth); + _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index 38de9bda1f..c2635a8dfa 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -53,70 +53,6 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); private AcceptingThread _acceptor; - -/* - private SocketChannel _socketChannel; - private NonBlockingConnection _connection; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, - TransportActivity transportActivity) - { - int sendBufferSize = settings.getWriteBufferSize(); - int receiveBufferSize = settings.getReadBufferSize(); - - try - { - _socketChannel = SocketChannel.open(); - _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay()); - _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)); - LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF)); - LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY)); - } - - InetAddress address = InetAddress.getByName(settings.getHost()); - - _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()), - settings.getConnectTimeout()); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - - try - { - IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); - _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize, - TIMEOUT, ticker, _encryptionSet, _sslContext); - ticker.setConnection(_connection); - _connection.start(); - } - catch(Exception e) - { - try - { - _socketChannel.close(); - } - catch(IOException ioe) - { - //ignored, throw based on original exception - } - - throw new TransportException("Error creating network connection", e); - } - - return _connection; - } - -*/ - protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, final Receiver<ByteBuffer> engine, final Integer sendBufferSize, @@ -126,9 +62,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, final boolean wantClientAuth, - final boolean needClientAuth) + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) { - return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth); + return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); } public void close() @@ -242,7 +179,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport { socket = _serverSocket.accept(); - ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); if(engine != null) { @@ -268,7 +205,16 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport _encryptionSet, _sslContext, _config.wantClientAuth(), - _config.needClientAuth()); + _config.needClientAuth(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }); connection.setMaxReadIdle(HANSHAKE_TIMEOUT); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 616390cf70..dfc2697c79 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -68,6 +68,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; private final SSLContext _sslContext; + private final Runnable _onTransportEncryptionAction; private ByteBuffer _netInputBuffer; private SSLEngine _sslEngine; @@ -84,7 +85,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, final boolean wantClientAuth, - final boolean needClientAuth) + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) { _socketChannel = socketChannel; _receiver = receiver; @@ -92,7 +94,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _ticker = ticker; _encryptionSet = encryptionSet; _sslContext = sslContext; - + _onTransportEncryptionAction = onTransportEncryptionAction; if(encryptionSet.size() == 1) { @@ -113,7 +115,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _sslEngine.setWantClientAuth(true); } _netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); - + onTransportEncryptionAction.run(); } try @@ -200,7 +202,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> doRead(); boolean fullyWritten = doWrite(); - _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + _socketChannel.register(_selector, + fullyWritten + ? SelectionKey.OP_READ + : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); } catch (IOException e) { @@ -416,6 +421,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } else { + _onTransportEncryptionAction.run(); _netInputBuffer.compact(); doRead(); } |
