summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-12-11 10:11:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-12-11 10:11:03 +0000
commitebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (patch)
treebc693f79d08d4d97a0295446fc3f9e9a794d0dea /qpid/java/common/src/main
parent5cce2b1fbd0d00486106d0cf9d734972f856ee6c (diff)
downloadqpid-python-ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa.tar.gz
Allow the transport to inform the model that encryption is being used
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644586 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java82
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java14
4 files changed, 33 insertions, 80 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 6774d0a45a..cad5461d83 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.protocol;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
* decodes it and then process the result.
@@ -56,7 +56,8 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>, Transport
// Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
void readerIdle();
+ void encryptedTransport();
public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
-} \ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index e47e33f748..cfa4f48c19 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -27,8 +27,6 @@ import java.security.Principal;
import java.util.Set;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,12 +57,14 @@ public class NonBlockingConnection implements NetworkConnection
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
- final boolean wantClientAuth, final boolean needClientAuth)
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
{
_socket = socket;
_timeout = timeout;
- _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index 38de9bda1f..c2635a8dfa 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -53,70 +53,6 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
-
-/*
- private SocketChannel _socketChannel;
- private NonBlockingConnection _connection;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
- TransportActivity transportActivity)
- {
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
- try
- {
- _socketChannel = SocketChannel.open();
- _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay());
- _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF));
- LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF));
- LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY));
- }
-
- InetAddress address = InetAddress.getByName(settings.getHost());
-
- _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()),
- settings.getConnectTimeout());
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
-
- try
- {
- IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
- _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize,
- TIMEOUT, ticker, _encryptionSet, _sslContext);
- ticker.setConnection(_connection);
- _connection.start();
- }
- catch(Exception e)
- {
- try
- {
- _socketChannel.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
-
- return _connection;
- }
-
-*/
-
protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
final Receiver<ByteBuffer> engine,
final Integer sendBufferSize,
@@ -126,9 +62,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
final boolean wantClientAuth,
- final boolean needClientAuth)
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
{
- return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth);
+ return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
}
public void close()
@@ -242,7 +179,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
socket = _serverSocket.accept();
- ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+ final ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
if(engine != null)
{
@@ -268,7 +205,16 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
_encryptionSet,
_sslContext,
_config.wantClientAuth(),
- _config.needClientAuth());
+ _config.needClientAuth(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ });
connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 616390cf70..dfc2697c79 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -68,6 +68,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private final Ticker _ticker;
private final Set<TransportEncryption> _encryptionSet;
private final SSLContext _sslContext;
+ private final Runnable _onTransportEncryptionAction;
private ByteBuffer _netInputBuffer;
private SSLEngine _sslEngine;
@@ -84,7 +85,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
final boolean wantClientAuth,
- final boolean needClientAuth)
+ final boolean needClientAuth,
+ final Runnable onTransportEncryptionAction)
{
_socketChannel = socketChannel;
_receiver = receiver;
@@ -92,7 +94,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_ticker = ticker;
_encryptionSet = encryptionSet;
_sslContext = sslContext;
-
+ _onTransportEncryptionAction = onTransportEncryptionAction;
if(encryptionSet.size() == 1)
{
@@ -113,7 +115,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_sslEngine.setWantClientAuth(true);
}
_netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
-
+ onTransportEncryptionAction.run();
}
try
@@ -200,7 +202,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
doRead();
boolean fullyWritten = doWrite();
- _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+ _socketChannel.register(_selector,
+ fullyWritten
+ ? SelectionKey.OP_READ
+ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
}
catch (IOException e)
{
@@ -416,6 +421,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
else
{
+ _onTransportEncryptionAction.run();
_netInputBuffer.compact();
doRead();
}