diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-12-11 10:11:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-12-11 10:11:03 +0000 |
| commit | ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (patch) | |
| tree | bc693f79d08d4d97a0295446fc3f9e9a794d0dea /qpid/java | |
| parent | 5cce2b1fbd0d00486106d0cf9d734972f856ee6c (diff) | |
| download | qpid-python-ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa.tar.gz | |
Allow the transport to inform the model that encryption is being used
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644586 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
13 files changed, 80 insertions, 367 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 0ecfb08110..9df4ad87e0 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -24,12 +24,8 @@ package org.apache.qpid.server.protocol; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.security.Principal; import java.util.Set; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.auth.Subject; import org.apache.log4j.Logger; @@ -43,20 +39,14 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLReceiver; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class MultiVersionProtocolEngine implements ServerProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); private final long _id; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; - private final Transport _transport; + private Transport _transport; private final ProtocolEngineCreator[] _creators; private final Runnable _onCloseTask; @@ -70,9 +60,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(final Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supported, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -91,15 +78,16 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _broker = broker; _supported = supported; _defaultSupportedReply = defaultSupportedReply; - _sslContext = sslContext; - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; _creators = creators; _onCloseTask = onCloseTask; } + void setTransport(Transport transport) + { + _transport = transport; + } public SocketAddress getRemoteAddress() { @@ -146,6 +134,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.readerIdle(); } + @Override + public void encryptedTransport() + { + _delegate.encryptedTransport(); + } + public void received(ByteBuffer msg) { @@ -246,6 +240,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public void encryptedTransport() + { + + } + public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) { @@ -359,15 +359,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } } - - if(newDelegate == null && looksLikeSSL(headerBytes)) - { - if(_sslContext != null) - { - newDelegate = new SslDelegateProtocolEngine(); - } - } - // If no delegate is found then send back a supported protocol version id if(newDelegate == null) { @@ -465,131 +456,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _network.close(); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) - { - - } - @Override - public long getLastReadTime() + public void encryptedTransport() { - return _lastReadTime; - } - - @Override - public long getLastWriteTime() - { - return 0; - } - } - - private class SslDelegateProtocolEngine implements ServerProtocolEngine - { - private final MultiVersionProtocolEngine _decryptEngine; - private final SSLEngine _engine; - private final SSLReceiver _sslReceiver; - private final SSLBufferingSender _sslSender; - private long _lastReadTime; - - private SslDelegateProtocolEngine() - { - - _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, - _defaultSupportedReply, _port, Transport.SSL, _id, _creators, - null); - - _engine = _sslContext.createSSLEngine(); - _engine.setUseClientMode(false); - SSLUtil.removeSSLv3Support(_engine); - - if(_needClientAuth) + if(_transport == Transport.TCP) { - _engine.setNeedClientAuth(true); + _transport = Transport.SSL; } - else if(_wantClientAuth) - { - _engine.setWantClientAuth(true); - } - - SSLStatus sslStatus = new SSLStatus(); - _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus); - _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus); - _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender); } - @Override - public void received(ByteBuffer msg) - { - _lastReadTime = System.currentTimeMillis(); - _sslReceiver.received(msg); - _sslSender.send(); - _sslSender.flush(); - } - - @Override public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) { - //TODO - Implement - } - - @Override - public SocketAddress getRemoteAddress() - { - return _decryptEngine.getRemoteAddress(); - } - @Override - public SocketAddress getLocalAddress() - { - return _decryptEngine.getLocalAddress(); - } - - @Override - public long getWrittenBytes() - { - return _decryptEngine.getWrittenBytes(); - } - - @Override - public long getReadBytes() - { - return _decryptEngine.getReadBytes(); - } - - @Override - public void closed() - { - _decryptEngine.closed(); - } - - @Override - public void writerIdle() - { - _decryptEngine.writerIdle(); - } - - @Override - public void readerIdle() - { - _decryptEngine.readerIdle(); - } - - @Override - public void exception(Throwable t) - { - _decryptEngine.exception(t); - } - - @Override - public long getConnectionId() - { - return _decryptEngine.getConnectionId(); - } - - @Override - public Subject getSubject() - { - return _decryptEngine.getSubject(); } @Override @@ -601,132 +479,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine @Override public long getLastWriteTime() { - return _decryptEngine.getLastWriteTime(); + return 0; } } - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - - private static class SSLNetworkConnection implements NetworkConnection - { - private final NetworkConnection _network; - private final SSLBufferingSender _sslSender; - private final SSLEngine _engine; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public SSLNetworkConnection(SSLEngine engine, NetworkConnection network, - SSLBufferingSender sslSender) - { - _engine = engine; - _network = network; - _sslSender = sslSender; - - } - - @Override - public Sender<ByteBuffer> getSender() - { - return _sslSender; - } - - @Override - public void start() - { - _network.start(); - } - - @Override - public void close() - { - _sslSender.close(); - - _network.close(); - } - @Override - public SocketAddress getRemoteAddress() - { - return _network.getRemoteAddress(); - } - - @Override - public SocketAddress getLocalAddress() - { - return _network.getLocalAddress(); - } - - @Override - public void setMaxWriteIdle(int sec) - { - _network.setMaxWriteIdle(sec); - } - - @Override - public void setMaxReadIdle(int sec) - { - _network.setMaxReadIdle(sec); - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - try - { - _principal = _engine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) - { - _principal = null; - } - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _network.getMaxReadIdle(); - } - - @Override - public int getMaxWriteIdle() - { - return _network.getMaxWriteIdle(); - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 5c704c5967..a51717e79e 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -27,10 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import javax.net.ssl.SSLContext; - import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.PortMessages; import org.apache.qpid.server.logging.subjects.PortLogSubject; import org.apache.qpid.server.model.Broker; @@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private final Broker<?> _broker; private final Set<Protocol> _supported; private final Protocol _defaultSupportedReply; - private final SSLContext _sslContext; - private final boolean _wantClientAuth; - private final boolean _needClientAuth; private final AmqpPort<?> _port; private final Transport _transport; private final ProtocolEngineCreator[] _creators; @@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory _connectionCountDecrementingTask = new ConnectionCountDecrementingTask(); public MultiVersionProtocolEngineFactory(Broker<?> broker, - SSLContext sslContext, - boolean wantClientAuth, - boolean needClientAuth, final Set<Protocol> supportedVersions, final Protocol defaultSupportedReply, AmqpPort<?> port, @@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } _broker = broker; - _sslContext = sslContext; _supported = supportedVersions; _defaultSupportedReply = defaultSupportedReply; final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>(); @@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } Collections.sort(creators, new ProtocolEngineCreatorComparator()); _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]); - _wantClientAuth = wantClientAuth; - _needClientAuth = needClientAuth; _port = port; _transport = transport; } - public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) + public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) { if(_port.canAcceptNewConnection(remoteSocketAddress)) { _port.incrementConnectionCount(); - return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, + return new MultiVersionProtocolEngine(_broker, _supported, _defaultSupportedReply, _port, _transport, ID_GENERATOR.getAndIncrement(), _creators, _connectionCountDecrementingTask); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index cd50998609..2fd10e4de4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -83,8 +83,7 @@ class TCPandSSLTransport implements AcceptingTransport _networkTransport = new NonBlockingNetworkTransport(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), _sslContext, - settings.wantClientAuth(), settings.needClientAuth(), + _port.getParent(Broker.class), _supported, _defaultSupportedProtocolReply, _port, diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 9ea249bd47..0224f1b015 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -193,6 +193,11 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return _writtenBytes; } + @Override + public void encryptedTransport() + { + } + public void writerIdle() { _connection.doHeartBeat(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 57ab22ad27..606649445d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1150,6 +1150,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 740b01e459..3bbfaac466 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -179,6 +179,11 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut //Todo } + @Override + public void encryptedTransport() + { + } + public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender) { _network = network; diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index a194ac70f9..311894c1f4 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -81,9 +81,7 @@ class WebSocketProvider implements AcceptingTransport _supported = supported; _defaultSupportedProtocolReply = defaultSupportedProtocolReply; _factory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), null, - _port.getWantClientAuth(), - _port.getNeedClientAuth(), + _port.getParent(Broker.class), _supported, _defaultSupportedProtocolReply, _port, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c61469559a..0fe2ce232e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -316,6 +316,11 @@ public class AMQProtocolHandler implements ProtocolEngine } } + @Override + public void encryptedTransport() + { + } + public void readerIdle() { _logger.debug("Protocol Session [" + this + "] idle: reader"); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 6774d0a45a..cad5461d83 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -20,14 +20,14 @@ */ package org.apache.qpid.protocol; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.TransportActivity; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - /** * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. @@ -56,7 +56,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) void readerIdle(); + void encryptedTransport(); public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java index e47e33f748..cfa4f48c19 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -27,8 +27,6 @@ import java.security.Principal; import java.util.Set; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,12 +57,14 @@ public class NonBlockingConnection implements NetworkConnection Ticker ticker, final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, - final boolean wantClientAuth, final boolean needClientAuth) + final boolean wantClientAuth, + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) { _socket = socket; _timeout = timeout; - _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth); + _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index 38de9bda1f..c2635a8dfa 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -53,70 +53,6 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); private AcceptingThread _acceptor; - -/* - private SocketChannel _socketChannel; - private NonBlockingConnection _connection; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, - TransportActivity transportActivity) - { - int sendBufferSize = settings.getWriteBufferSize(); - int receiveBufferSize = settings.getReadBufferSize(); - - try - { - _socketChannel = SocketChannel.open(); - _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay()); - _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)); - LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF)); - LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY)); - } - - InetAddress address = InetAddress.getByName(settings.getHost()); - - _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()), - settings.getConnectTimeout()); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - - try - { - IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); - _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize, - TIMEOUT, ticker, _encryptionSet, _sslContext); - ticker.setConnection(_connection); - _connection.start(); - } - catch(Exception e) - { - try - { - _socketChannel.close(); - } - catch(IOException ioe) - { - //ignored, throw based on original exception - } - - throw new TransportException("Error creating network connection", e); - } - - return _connection; - } - -*/ - protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, final Receiver<ByteBuffer> engine, final Integer sendBufferSize, @@ -126,9 +62,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, final boolean wantClientAuth, - final boolean needClientAuth) + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) { - return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth); + return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction); } public void close() @@ -242,7 +179,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport { socket = _serverSocket.accept(); - ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); if(engine != null) { @@ -268,7 +205,16 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport _encryptionSet, _sslContext, _config.wantClientAuth(), - _config.needClientAuth()); + _config.needClientAuth(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }); connection.setMaxReadIdle(HANSHAKE_TIMEOUT); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 616390cf70..dfc2697c79 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -68,6 +68,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; private final SSLContext _sslContext; + private final Runnable _onTransportEncryptionAction; private ByteBuffer _netInputBuffer; private SSLEngine _sslEngine; @@ -84,7 +85,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, final boolean wantClientAuth, - final boolean needClientAuth) + final boolean needClientAuth, + final Runnable onTransportEncryptionAction) { _socketChannel = socketChannel; _receiver = receiver; @@ -92,7 +94,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _ticker = ticker; _encryptionSet = encryptionSet; _sslContext = sslContext; - + _onTransportEncryptionAction = onTransportEncryptionAction; if(encryptionSet.size() == 1) { @@ -113,7 +115,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _sslEngine.setWantClientAuth(true); } _netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); - + onTransportEncryptionAction.run(); } try @@ -200,7 +202,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> doRead(); boolean fullyWritten = doWrite(); - _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + _socketChannel.register(_selector, + fullyWritten + ? SelectionKey.OP_READ + : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); } catch (IOException e) { @@ -416,6 +421,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } else { + _onTransportEncryptionAction.run(); _netInputBuffer.compact(); doRead(); } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 007772e8be..e762ac1f12 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -161,7 +161,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l); MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port, + new MultiVersionProtocolEngineFactory(_broker, protocols, null, port, org.apache.qpid.server.model.Transport.TCP); //create a dummy to retrieve the 'current' ID number @@ -215,7 +215,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase try { - new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, Protocol.AMQP_0_9, null, + new MultiVersionProtocolEngineFactory(_broker, versions, Protocol.AMQP_0_9, null, org.apache.qpid.server.model.Transport.TCP); fail("should not have been allowed to create the factory"); } |
