summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:11:13 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:11:13 +0000
commit38194151e929fef7fa8adeb08badfa85a17d8404 (patch)
tree4db499ef5ef1f14a8506a123dffef2a79baeb8dc /qpid/java/broker-core/src
parent503b33b9c8ae17a5dc77b8a306a120ea8cdb31d4 (diff)
downloadqpid-python-38194151e929fef7fa8adeb08badfa85a17d8404.tar.gz
refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java634
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java187
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java302
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java1
6 files changed, 1124 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
index 6e1b6529d8..6100a2eb80 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
@@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/*
*
*/
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 946992cbb6..3b7883b9b9 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -30,7 +30,6 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
new file mode 100644
index 0000000000..331c2e697d
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -0,0 +1,634 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.apache.qpid.util.SystemUtils;
+
+public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private final SocketChannel _socketChannel;
+ private final long _timeout;
+ private final Ticker _ticker;
+ private final SelectorThread _selector;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+ private final Object _lock = new Object();
+
+ public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
+ private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+
+ private final String _remoteSocketAddress;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final ServerProtocolEngine _protocolEngine;
+ private final int _receiveBufSize;
+ private final Set<TransportEncryption> _encryptionSet;
+ private final SSLContext _sslContext;
+ private final Runnable _onTransportEncryptionAction;
+ private ByteBuffer _netInputBuffer;
+ private SSLEngine _sslEngine;
+
+ private ByteBuffer _currentBuffer;
+
+ private TransportEncryption _transportEncryption;
+ private SSLEngineResult _status;
+ private volatile boolean _fullyWritten = true;
+ private AtomicBoolean _stateChanged = new AtomicBoolean();
+ private boolean _workDone;
+
+
+ public NonBlockingConnection(SocketChannel socketChannel,
+ ServerProtocolEngine delegate,
+ int sendBufferSize,
+ int receiveBufferSize,
+ long timeout,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites,
+ final Runnable onTransportEncryptionAction,
+ final SelectorThread selectorThread)
+ {
+ _socketChannel = socketChannel;
+ _timeout = timeout;
+ _ticker = ticker;
+ _selector = selectorThread;
+
+ _protocolEngine = delegate;
+ _receiveBufSize = receiveBufferSize;
+ _encryptionSet = encryptionSet;
+ _sslContext = sslContext;
+ _onTransportEncryptionAction = onTransportEncryptionAction;
+
+ if(encryptionSet.size() == 1)
+ {
+ _transportEncryption = _encryptionSet.iterator().next();
+ if (_transportEncryption == TransportEncryption.TLS)
+ {
+ onTransportEncryptionAction.run();
+ }
+ }
+
+ if(encryptionSet.contains(TransportEncryption.TLS))
+ {
+ _sslEngine = _sslContext.createSSLEngine();
+ _sslEngine.setUseClientMode(false);
+ SSLUtil.removeSSLv3Support(_sslEngine);
+ SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
+
+ if(needClientAuth)
+ {
+ _sslEngine.setNeedClientAuth(true);
+ }
+ else if(wantClientAuth)
+ {
+ _sslEngine.setWantClientAuth(true);
+ }
+ _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
+ }
+
+ try
+ {
+ _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
+ _socketChannel.configureBlocking(false);
+ }
+ catch (IOException e)
+ {
+ throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+ }
+
+
+ }
+
+
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return _socketChannel;
+ }
+
+ public void start()
+ {
+ }
+
+ public ByteBufferSender getSender()
+ {
+ return this;
+ }
+
+ public void close()
+ {
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+ if(_closed.compareAndSet(false,true))
+ {
+ _stateChanged.set(true);
+ getSelector().wakeup();
+ }
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socketChannel.socket().getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socketChannel.socket().getLocalSocketAddress();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ synchronized (_lock)
+ {
+ if(!_principalChecked)
+ {
+ if (_sslEngine != null)
+ {
+ try
+ {
+ _principal = _sslEngine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ return null;
+ }
+ }
+
+ _principalChecked = true;
+ }
+
+ return _principal;
+ }
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+
+ public boolean canRead()
+ {
+ return true;
+ }
+
+ public boolean waitingForWrite()
+ {
+ return !_fullyWritten;
+ }
+
+ public boolean isStateChanged()
+ {
+
+ return _stateChanged.get();
+ }
+
+ public boolean doWork()
+ {
+ _stateChanged.set(false);
+ boolean closed = _closed.get();
+ if (!closed)
+ {
+ try
+ {
+ _workDone = false;
+
+ long currentTime = System.currentTimeMillis();
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if (tick <= 0)
+ {
+ _ticker.tick(currentTime);
+ }
+
+ _protocolEngine.setMessageAssignmentSuspended(true);
+
+ _protocolEngine.processPendingMessages();
+
+ _protocolEngine.setTransportBlockedForWriting(!doWrite());
+ boolean dataRead = doRead();
+ _fullyWritten = doWrite();
+ _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+
+ if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
+ {
+ _stateChanged.set(true);
+ }
+
+ // tell all consumer targets that it is okay to accept more
+ _protocolEngine.setMessageAssignmentSuspended(false);
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+ if(_closed.compareAndSet(false,true))
+ {
+ _stateChanged.set(true);
+ getSelector().wakeup();
+ }
+ }
+ }
+ else
+ {
+
+ if(!SystemUtils.isWindows())
+ {
+ try
+ {
+ _socketChannel.shutdownInput();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
+
+ }
+ }
+ try
+ {
+ while(!doWrite())
+ {
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+
+ }
+ LOGGER.debug("Closing receiver");
+ _protocolEngine.closed();
+
+ try
+ {
+ if(!SystemUtils.isWindows())
+ {
+ _socketChannel.shutdownOutput();
+ }
+
+ _socketChannel.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
+ }
+ }
+
+ return closed;
+
+ }
+
+ public SelectorThread getSelector()
+ {
+ return _selector;
+ }
+
+ public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
+ }
+
+ public boolean doRead() throws IOException
+ {
+ boolean readData = false;
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+ int remaining = 0;
+ while (remaining == 0 && !_closed.get())
+ {
+ if (_currentBuffer == null || _currentBuffer.remaining() == 0)
+ {
+ _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+ }
+ int read = _socketChannel.read(_currentBuffer);
+ if(read > 0)
+ {
+ readData = true;
+ }
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+ remaining = _currentBuffer.remaining();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " byte(s)");
+ }
+ ByteBuffer dup = _currentBuffer.duplicate();
+ dup.flip();
+ _currentBuffer = _currentBuffer.slice();
+ _protocolEngine.received(dup);
+ }
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int read = 1;
+ while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+ {
+ read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+ else if(read > 0)
+ {
+ readData = true;
+ }
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " encrypted bytes ");
+ }
+
+ _netInputBuffer.flip();
+
+
+ int unwrapped = 0;
+ boolean tasksRun;
+ do
+ {
+ ByteBuffer appInputBuffer =
+ ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
+ _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+ tasksRun = runSSLEngineTasks(_status);
+
+ appInputBuffer.flip();
+ unwrapped = appInputBuffer.remaining();
+ if(unwrapped > 0)
+ {
+ readData = true;
+ }
+ _protocolEngine.received(appInputBuffer);
+ }
+ while(unwrapped > 0 || tasksRun);
+
+ _netInputBuffer.compact();
+
+ }
+ }
+ else
+ {
+ int read = 1;
+ while (!_closed.get() && read > 0)
+ {
+
+ read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
+ }
+
+ if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
+ {
+ _netInputBuffer.flip();
+ final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
+ ByteBuffer dup = _netInputBuffer.duplicate();
+ dup.get(headerBytes);
+
+ _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
+ LOGGER.debug("Identified transport encryption as " + _transportEncryption);
+
+ if (_transportEncryption == TransportEncryption.NONE)
+ {
+ _protocolEngine.received(_netInputBuffer);
+ }
+ else
+ {
+ _onTransportEncryptionAction.run();
+ _netInputBuffer.compact();
+ readData = doRead();
+ }
+ break;
+ }
+ }
+ }
+ return readData;
+ }
+
+ public boolean doWrite() throws IOException
+ {
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for (int i = 0; i < bufArray.length; i++)
+ {
+ bufArray[i] = bufferIterator.next();
+ }
+
+ int byteBuffersWritten = 0;
+
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+
+
+ long written = _socketChannel.write(bufArray);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " bytes");
+ }
+
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+
+
+ return bufArray.length == byteBuffersWritten;
+ }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int remaining = 0;
+ do
+ {
+ if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+ {
+ _workDone = true;
+ final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _status = _sslEngine.wrap(bufArray, netBuffer);
+ runSSLEngineTasks(_status);
+
+ netBuffer.flip();
+ remaining = netBuffer.remaining();
+ if (remaining != 0)
+ {
+ _encryptedOutput.add(netBuffer);
+ }
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+ }
+
+ }
+ while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+ ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+ long written = _socketChannel.write(encryptedBuffers);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " encrypted bytes");
+ }
+ ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer buf = iter.next();
+ if(buf.remaining() == 0)
+ {
+ iter.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return bufArray.length == byteBuffersWritten;
+
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ public boolean runSSLEngineTasks(final SSLEngineResult status)
+ {
+ if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+ {
+ Runnable task;
+ while((task = _sslEngine.getDelegatedTask()) != null)
+ {
+ task.run();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public boolean looksLikeSSL(final byte[] headerBytes)
+ {
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ if (_closed.get())
+ {
+ throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
+ }
+ // append to list and do selector wakeup
+ _buffers.add(msg);
+ _stateChanged.set(true);
+ }
+
+ @Override
+ public void flush()
+ {
+ _stateChanged.set(true);
+ getSelector().wakeup();
+
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
new file mode 100644
index 0000000000..79313712a5
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
+import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
+
+public class NonBlockingNetworkTransport
+{
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private SelectorThread _selector;
+
+
+ private Set<TransportEncryption> _encryptionSet;
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocketChannel _serverSocket;
+ private int _timeout;
+
+ public void close()
+ {
+ if(_selector != null)
+ {
+ try
+ {
+ if (_serverSocket != null)
+ {
+ _selector.cancelAcceptingSocket(_serverSocket);
+ _serverSocket.close();
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ finally
+ {
+
+ _selector.close();
+ }
+ }
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext,
+ final Set<TransportEncryption> encryptionSet)
+ {
+ try
+ {
+
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ _serverSocket = ServerSocketChannel.open();
+
+ _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ _serverSocket.bind(address);
+ _serverSocket.configureBlocking(false);
+ _encryptionSet = encryptionSet;
+
+ _selector = new SelectorThread(config.getAddress().toString(), this);
+ _selector.start();
+ _selector.addAcceptingSocket(_serverSocket);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+
+
+ }
+
+ public int getAcceptingPort()
+ {
+ return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+ }
+
+ public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ {
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ _config.getEnabledCipherSuites(),
+ _config.getDisabledCipherSuites(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ connection.start();
+
+ _selector.addConnection(connection);
+
+ }
+ else
+ {
+ socketChannel.close();
+ }
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
new file mode 100644
index 0000000000..786f1915a7
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+* Created by keith on 28/01/2015.
+*/
+public class SelectorThread extends Thread
+{
+ private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
+ private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+ private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+ private final Selector _selector;
+ private final AtomicBoolean _closed = new AtomicBoolean();
+ private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+ private final NonBlockingNetworkTransport _transport;
+
+ SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+ {
+ super("SelectorThread-"+name);
+ _transport = nonBlockingNetworkTransport;
+ try
+ {
+ _selector = Selector.open();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void addAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ _tasks.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
+ }
+ catch (ClosedChannelException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ }
+ });
+ _selector.wakeup();
+ }
+
+ public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ _tasks.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ SelectionKey selectionKey = socketChannel.keyFor(_selector);
+ if(selectionKey != null)
+ {
+ selectionKey.cancel();
+ }
+ }
+ });
+ _selector.wakeup();
+ }
+
+ @Override
+ public void run()
+ {
+
+ long nextTimeout = 0;
+
+ try
+ {
+ while (!_closed.get())
+ {
+
+ _selector.select(nextTimeout);
+
+ while(_tasks.peek() != null)
+ {
+ Runnable task = _tasks.poll();
+ task.run();
+ }
+
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ if(key.isAcceptable())
+ {
+ // todo - should we schedule this rather than running in this thread?
+ SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
+ _transport.acceptSocketChannel(acceptedChannel);
+ }
+ else
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+
+ key.channel().register(_selector, 0);
+
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
+ }
+
+ }
+ selectionKeys.clear();
+
+ while (_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
+
+
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+
+ }
+
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
+
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+ if (period < 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ connection.getSocketChannel().register(_selector, 0).cancel();
+ iterator.remove();
+ }
+ else
+ {
+ nextTimeout = Math.min(period, nextTimeout);
+ }
+ }
+
+ for (NonBlockingConnection connection : toBeScheduled)
+ {
+ _scheduler.schedule(connection);
+ }
+
+ }
+ }
+ catch (IOException e)
+ {
+ //TODO
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+
+
+
+ }
+
+ public void addConnection(final NonBlockingConnection connection)
+ {
+ _unregisteredConnections.add(connection);
+ _selector.wakeup();
+
+ }
+
+ public void wakeup()
+ {
+ _selector.wakeup();
+ }
+
+ public void close()
+ {
+ _closed.set(true);
+ _selector.wakeup();
+ _scheduler.close();
+ }
+
+ private class NetworkConnectionScheduler
+ {
+ private final ScheduledThreadPoolExecutor _executor;
+ private final AtomicInteger _running = new AtomicInteger();
+ private final int _poolSize;
+
+ private NetworkConnectionScheduler()
+ {
+ _poolSize = Runtime.getRuntime().availableProcessors();
+ _executor = new ScheduledThreadPoolExecutor(_poolSize);
+ _executor.prestartAllCoreThreads();
+ }
+
+ public void processConnection(final NonBlockingConnection connection)
+ {
+ try
+ {
+ _running.incrementAndGet();
+ boolean rerun;
+ do
+ {
+ rerun = false;
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+
+ if (connection.isStateChanged())
+ {
+ if (_running.get() == _poolSize)
+ {
+ schedule(connection);
+ }
+ else
+ {
+ rerun = true;
+ }
+ }
+ else
+ {
+ SelectorThread.this.addConnection(connection);
+ }
+ }
+
+ } while (rerun);
+ }
+ finally
+ {
+ _running.decrementAndGet();
+ }
+ }
+
+ public void schedule(final NonBlockingConnection connection)
+ {
+ _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ processConnection(connection);
+ }
+ });
+ }
+
+ public void close()
+ {
+ _executor.shutdown();
+ }
+
+
+
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index 5f5467db07..7874437a2f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -36,7 +36,6 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
{