summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-12-11 10:11:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-12-11 10:11:03 +0000
commitebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (patch)
treebc693f79d08d4d97a0295446fc3f9e9a794d0dea /qpid/java
parent5cce2b1fbd0d00486106d0cf9d734972f856ee6c (diff)
downloadqpid-python-ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa.tar.gz
Allow the transport to inform the model that encryption is being used
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644586 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java287
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java3
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java5
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java82
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java14
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java4
13 files changed, 80 insertions, 367 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 0ecfb08110..9df4ad87e0 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,12 +24,8 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.security.Principal;
import java.util.Set;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
@@ -43,20 +39,14 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
- private final Transport _transport;
+ private Transport _transport;
private final ProtocolEngineCreator[] _creators;
private final Runnable _onCloseTask;
@@ -70,9 +60,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(final Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supported,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -91,15 +78,16 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
- _sslContext = sslContext;
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
_creators = creators;
_onCloseTask = onCloseTask;
}
+ void setTransport(Transport transport)
+ {
+ _transport = transport;
+ }
public SocketAddress getRemoteAddress()
{
@@ -146,6 +134,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate.readerIdle();
}
+ @Override
+ public void encryptedTransport()
+ {
+ _delegate.encryptedTransport();
+ }
+
public void received(ByteBuffer msg)
{
@@ -246,6 +240,12 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ @Override
+ public void encryptedTransport()
+ {
+
+ }
+
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
@@ -359,15 +359,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
-
- if(newDelegate == null && looksLikeSSL(headerBytes))
- {
- if(_sslContext != null)
- {
- newDelegate = new SslDelegateProtocolEngine();
- }
- }
-
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -465,131 +456,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_network.close();
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
- {
-
- }
-
@Override
- public long getLastReadTime()
+ public void encryptedTransport()
{
- return _lastReadTime;
- }
-
- @Override
- public long getLastWriteTime()
- {
- return 0;
- }
- }
-
- private class SslDelegateProtocolEngine implements ServerProtocolEngine
- {
- private final MultiVersionProtocolEngine _decryptEngine;
- private final SSLEngine _engine;
- private final SSLReceiver _sslReceiver;
- private final SSLBufferingSender _sslSender;
- private long _lastReadTime;
-
- private SslDelegateProtocolEngine()
- {
-
- _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
- _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
- null);
-
- _engine = _sslContext.createSSLEngine();
- _engine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_engine);
-
- if(_needClientAuth)
+ if(_transport == Transport.TCP)
{
- _engine.setNeedClientAuth(true);
+ _transport = Transport.SSL;
}
- else if(_wantClientAuth)
- {
- _engine.setWantClientAuth(true);
- }
-
- SSLStatus sslStatus = new SSLStatus();
- _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
- _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
- _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
}
- @Override
- public void received(ByteBuffer msg)
- {
- _lastReadTime = System.currentTimeMillis();
- _sslReceiver.received(msg);
- _sslSender.send();
- _sslSender.flush();
- }
-
- @Override
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
{
- //TODO - Implement
- }
-
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _decryptEngine.getRemoteAddress();
- }
- @Override
- public SocketAddress getLocalAddress()
- {
- return _decryptEngine.getLocalAddress();
- }
-
- @Override
- public long getWrittenBytes()
- {
- return _decryptEngine.getWrittenBytes();
- }
-
- @Override
- public long getReadBytes()
- {
- return _decryptEngine.getReadBytes();
- }
-
- @Override
- public void closed()
- {
- _decryptEngine.closed();
- }
-
- @Override
- public void writerIdle()
- {
- _decryptEngine.writerIdle();
- }
-
- @Override
- public void readerIdle()
- {
- _decryptEngine.readerIdle();
- }
-
- @Override
- public void exception(Throwable t)
- {
- _decryptEngine.exception(t);
- }
-
- @Override
- public long getConnectionId()
- {
- return _decryptEngine.getConnectionId();
- }
-
- @Override
- public Subject getSubject()
- {
- return _decryptEngine.getSubject();
}
@Override
@@ -601,132 +479,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
@Override
public long getLastWriteTime()
{
- return _decryptEngine.getLastWriteTime();
+ return 0;
}
}
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
-
- private static class SSLNetworkConnection implements NetworkConnection
- {
- private final NetworkConnection _network;
- private final SSLBufferingSender _sslSender;
- private final SSLEngine _engine;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
- SSLBufferingSender sslSender)
- {
- _engine = engine;
- _network = network;
- _sslSender = sslSender;
-
- }
-
- @Override
- public Sender<ByteBuffer> getSender()
- {
- return _sslSender;
- }
-
- @Override
- public void start()
- {
- _network.start();
- }
-
- @Override
- public void close()
- {
- _sslSender.close();
-
- _network.close();
- }
- @Override
- public SocketAddress getRemoteAddress()
- {
- return _network.getRemoteAddress();
- }
-
- @Override
- public SocketAddress getLocalAddress()
- {
- return _network.getLocalAddress();
- }
-
- @Override
- public void setMaxWriteIdle(int sec)
- {
- _network.setMaxWriteIdle(sec);
- }
-
- @Override
- public void setMaxReadIdle(int sec)
- {
- _network.setMaxReadIdle(sec);
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
- try
- {
- _principal = _engine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
- {
- _principal = null;
- }
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _network.getMaxReadIdle();
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _network.getMaxWriteIdle();
- }
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 5c704c5967..a51717e79e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -27,10 +27,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
-
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
@@ -48,9 +45,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
private final Broker<?> _broker;
private final Set<Protocol> _supported;
private final Protocol _defaultSupportedReply;
- private final SSLContext _sslContext;
- private final boolean _wantClientAuth;
- private final boolean _needClientAuth;
private final AmqpPort<?> _port;
private final Transport _transport;
private final ProtocolEngineCreator[] _creators;
@@ -58,9 +52,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
_connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
public MultiVersionProtocolEngineFactory(Broker<?> broker,
- SSLContext sslContext,
- boolean wantClientAuth,
- boolean needClientAuth,
final Set<Protocol> supportedVersions,
final Protocol defaultSupportedReply,
AmqpPort<?> port,
@@ -73,7 +64,6 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
_broker = broker;
- _sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
@@ -83,18 +73,16 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
Collections.sort(creators, new ProtocolEngineCreatorComparator());
_creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
- _wantClientAuth = wantClientAuth;
- _needClientAuth = needClientAuth;
_port = port;
_transport = transport;
}
- public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
+ public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
{
if(_port.canAcceptNewConnection(remoteSocketAddress))
{
_port.incrementConnectionCount();
- return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ return new MultiVersionProtocolEngine(_broker,
_supported, _defaultSupportedReply, _port, _transport,
ID_GENERATOR.getAndIncrement(),
_creators, _connectionCountDecrementingTask);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index cd50998609..2fd10e4de4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -83,8 +83,7 @@ class TCPandSSLTransport implements AcceptingTransport
_networkTransport = new NonBlockingNetworkTransport();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), _sslContext,
- settings.wantClientAuth(), settings.needClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 9ea249bd47..0224f1b015 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -193,6 +193,11 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
return _writtenBytes;
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void writerIdle()
{
_connection.doHeartBeat();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 57ab22ad27..606649445d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -1150,6 +1150,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void readerIdle()
{
Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 740b01e459..3bbfaac466 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -179,6 +179,11 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
//Todo
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
{
_network = network;
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index a194ac70f9..311894c1f4 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -81,9 +81,7 @@ class WebSocketProvider implements AcceptingTransport
_supported = supported;
_defaultSupportedProtocolReply = defaultSupportedProtocolReply;
_factory = new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), null,
- _port.getWantClientAuth(),
- _port.getNeedClientAuth(),
+ _port.getParent(Broker.class),
_supported,
_defaultSupportedProtocolReply,
_port,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c61469559a..0fe2ce232e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -316,6 +316,11 @@ public class AMQProtocolHandler implements ProtocolEngine
}
}
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
public void readerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 6774d0a45a..cad5461d83 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.protocol;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
@@ -56,7 +56,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport
// Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
+ void encryptedTransport();
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index e47e33f748..cfa4f48c19 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -27,8 +27,6 @@ import java.security.Principal;
import java.util.Set;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,12 +57,14 @@ public class NonBlockingConnection implements NetworkConnection
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
- final boolean wantClientAuth, final boolean needClientAuth)
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
{
_socket = socket;
_timeout = timeout;
- _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index 38de9bda1f..c2635a8dfa 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -53,70 +53,6 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
-
-/*
- private SocketChannel _socketChannel;
- private NonBlockingConnection _connection;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
- TransportActivity transportActivity)
- {
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
- try
- {
- _socketChannel = SocketChannel.open();
- _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay());
- _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF));
- LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF));
- LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY));
- }
-
- InetAddress address = InetAddress.getByName(settings.getHost());
-
- _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()),
- settings.getConnectTimeout());
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
-
- try
- {
- IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
- _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize,
- TIMEOUT, ticker, _encryptionSet, _sslContext);
- ticker.setConnection(_connection);
- _connection.start();
- }
- catch(Exception e)
- {
- try
- {
- _socketChannel.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
-
- return _connection;
- }
-
-*/
-
protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
final Receiver<ByteBuffer> engine,
final Integer sendBufferSize,
@@ -126,9 +62,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
final boolean wantClientAuth,
- final boolean needClientAuth)
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
{
- return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
+ return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
}
public void close()
@@ -242,7 +179,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
socket = _serverSocket.accept();
- ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+ final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
if(engine != null)
{
@@ -268,7 +205,16 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
_encryptionSet,
_sslContext,
_config.wantClientAuth(),
- _config.needClientAuth());
+ _config.needClientAuth(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ });
connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 616390cf70..dfc2697c79 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -68,6 +68,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private final Ticker _ticker;
private final Set<TransportEncryption> _encryptionSet;
private final SSLContext _sslContext;
+ private final Runnable _onTransportEncryptionAction;
private ByteBuffer _netInputBuffer;
private SSLEngine _sslEngine;
@@ -84,7 +85,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
final boolean wantClientAuth,
- final boolean needClientAuth)
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
{
_socketChannel = socketChannel;
_receiver = receiver;
@@ -92,7 +94,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_ticker = ticker;
_encryptionSet = encryptionSet;
_sslContext = sslContext;
-
+ _onTransportEncryptionAction = onTransportEncryptionAction;
if(encryptionSet.size() == 1)
{
@@ -113,7 +115,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_sslEngine.setWantClientAuth(true);
}
_netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
-
+ onTransportEncryptionAction.run();
}
try
@@ -200,7 +202,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
doRead();
boolean fullyWritten = doWrite();
- _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+ _socketChannel.register(_selector,
+ fullyWritten
+ ? SelectionKey.OP_READ
+ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
}
catch (IOException e)
{
@@ -416,6 +421,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
else
{
+ _onTransportEncryptionAction.run();
_netInputBuffer.compact();
doRead();
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 007772e8be..e762ac1f12 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -161,7 +161,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
+ new MultiVersionProtocolEngineFactory(_broker, protocols, null, port,
org.apache.qpid.server.model.Transport.TCP);
//create a dummy to retrieve the 'current' ID number
@@ -215,7 +215,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
try
{
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, Protocol.AMQP_0_9, null,
+ new MultiVersionProtocolEngineFactory(_broker, versions, Protocol.AMQP_0_9, null,
org.apache.qpid.server.model.Transport.TCP);
fail("should not have been allowed to create the factory");
}