diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:11:13 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:11:13 +0000 |
| commit | 38194151e929fef7fa8adeb08badfa85a17d8404 (patch) | |
| tree | 4db499ef5ef1f14a8506a123dffef2a79baeb8dc /qpid/java/broker-core/src | |
| parent | 503b33b9c8ae17a5dc77b8a306a120ea8cdb31d4 (diff) | |
| download | qpid-python-38194151e929fef7fa8adeb08badfa85a17d8404.tar.gz | |
refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
6 files changed, 1124 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java index 6e1b6529d8..6100a2eb80 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java @@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/* * */ -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 946992cbb6..3b7883b9b9 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -30,7 +30,6 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java new file mode 100644 index 0000000000..331c2e697d --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -0,0 +1,634 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.Principal; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLPeerUnverifiedException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.ByteBufferSender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; +import org.apache.qpid.util.SystemUtils; + +public class NonBlockingConnection implements NetworkConnection, ByteBufferSender +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private final SocketChannel _socketChannel; + private final long _timeout; + private final Ticker _ticker; + private final SelectorThread _selector; + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private final Object _lock = new Object(); + + public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; + + private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); + private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); + + private final String _remoteSocketAddress; + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final ServerProtocolEngine _protocolEngine; + private final int _receiveBufSize; + private final Set<TransportEncryption> _encryptionSet; + private final SSLContext _sslContext; + private final Runnable _onTransportEncryptionAction; + private ByteBuffer _netInputBuffer; + private SSLEngine _sslEngine; + + private ByteBuffer _currentBuffer; + + private TransportEncryption _transportEncryption; + private SSLEngineResult _status; + private volatile boolean _fullyWritten = true; + private AtomicBoolean _stateChanged = new AtomicBoolean(); + private boolean _workDone; + + + public NonBlockingConnection(SocketChannel socketChannel, + ServerProtocolEngine delegate, + int sendBufferSize, + int receiveBufferSize, + long timeout, + Ticker ticker, + final Set<TransportEncryption> encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites, + final Runnable onTransportEncryptionAction, + final SelectorThread selectorThread) + { + _socketChannel = socketChannel; + _timeout = timeout; + _ticker = ticker; + _selector = selectorThread; + + _protocolEngine = delegate; + _receiveBufSize = receiveBufferSize; + _encryptionSet = encryptionSet; + _sslContext = sslContext; + _onTransportEncryptionAction = onTransportEncryptionAction; + + if(encryptionSet.size() == 1) + { + _transportEncryption = _encryptionSet.iterator().next(); + if (_transportEncryption == TransportEncryption.TLS) + { + onTransportEncryptionAction.run(); + } + } + + if(encryptionSet.contains(TransportEncryption.TLS)) + { + _sslEngine = _sslContext.createSSLEngine(); + _sslEngine.setUseClientMode(false); + SSLUtil.removeSSLv3Support(_sslEngine); + SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); + + if(needClientAuth) + { + _sslEngine.setNeedClientAuth(true); + } + else if(wantClientAuth) + { + _sslEngine.setWantClientAuth(true); + } + _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); + } + + try + { + _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); + _socketChannel.configureBlocking(false); + } + catch (IOException e) + { + throw new SenderException("Unable to prepare the channel for non-blocking IO", e); + } + + + } + + + public Ticker getTicker() + { + return _ticker; + } + + public SocketChannel getSocketChannel() + { + return _socketChannel; + } + + public void start() + { + } + + public ByteBufferSender getSender() + { + return this; + } + + public void close() + { + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _stateChanged.set(true); + getSelector().wakeup(); + } + } + + public SocketAddress getRemoteAddress() + { + return _socketChannel.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socketChannel.socket().getLocalSocketAddress(); + } + + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public Principal getPeerPrincipal() + { + synchronized (_lock) + { + if(!_principalChecked) + { + if (_sslEngine != null) + { + try + { + _principal = _sslEngine.getSession().getPeerPrincipal(); + } + catch (SSLPeerUnverifiedException e) + { + return null; + } + } + + _principalChecked = true; + } + + return _principal; + } + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + + public boolean canRead() + { + return true; + } + + public boolean waitingForWrite() + { + return !_fullyWritten; + } + + public boolean isStateChanged() + { + + return _stateChanged.get(); + } + + public boolean doWork() + { + _stateChanged.set(false); + boolean closed = _closed.get(); + if (!closed) + { + try + { + _workDone = false; + + long currentTime = System.currentTimeMillis(); + int tick = _ticker.getTimeToNextTick(currentTime); + if (tick <= 0) + { + _ticker.tick(currentTime); + } + + _protocolEngine.setMessageAssignmentSuspended(true); + + _protocolEngine.processPendingMessages(); + + _protocolEngine.setTransportBlockedForWriting(!doWrite()); + boolean dataRead = doRead(); + _fullyWritten = doWrite(); + _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); + + if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) + { + _stateChanged.set(true); + } + + // tell all consumer targets that it is okay to accept more + _protocolEngine.setMessageAssignmentSuspended(false); + } + catch (IOException e) + { + LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _stateChanged.set(true); + getSelector().wakeup(); + } + } + } + else + { + + if(!SystemUtils.isWindows()) + { + try + { + _socketChannel.shutdownInput(); + } + catch (IOException e) + { + LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e); + + } + } + try + { + while(!doWrite()) + { + } + } + catch (IOException e) + { + LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); + + } + LOGGER.debug("Closing receiver"); + _protocolEngine.closed(); + + try + { + if(!SystemUtils.isWindows()) + { + _socketChannel.shutdownOutput(); + } + + _socketChannel.close(); + } + catch (IOException e) + { + LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e); + } + } + + return closed; + + } + + public SelectorThread getSelector() + { + return _selector; + } + + public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); + } + + public boolean doRead() throws IOException + { + boolean readData = false; + if(_transportEncryption == TransportEncryption.NONE) + { + int remaining = 0; + while (remaining == 0 && !_closed.get()) + { + if (_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + int read = _socketChannel.read(_currentBuffer); + if(read > 0) + { + readData = true; + } + if (read == -1) + { + _closed.set(true); + } + remaining = _currentBuffer.remaining(); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " byte(s)"); + } + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _protocolEngine.received(dup); + } + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int read = 1; + while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) + { + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + else if(read > 0) + { + readData = true; + } + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " encrypted bytes "); + } + + _netInputBuffer.flip(); + + + int unwrapped = 0; + boolean tasksRun; + do + { + ByteBuffer appInputBuffer = + ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); + + _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); + tasksRun = runSSLEngineTasks(_status); + + appInputBuffer.flip(); + unwrapped = appInputBuffer.remaining(); + if(unwrapped > 0) + { + readData = true; + } + _protocolEngine.received(appInputBuffer); + } + while(unwrapped > 0 || tasksRun); + + _netInputBuffer.compact(); + + } + } + else + { + int read = 1; + while (!_closed.get() && read > 0) + { + + read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); + } + + if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) + { + _netInputBuffer.flip(); + final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; + ByteBuffer dup = _netInputBuffer.duplicate(); + dup.get(headerBytes); + + _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE; + LOGGER.debug("Identified transport encryption as " + _transportEncryption); + + if (_transportEncryption == TransportEncryption.NONE) + { + _protocolEngine.received(_netInputBuffer); + } + else + { + _onTransportEncryptionAction.run(); + _netInputBuffer.compact(); + readData = doRead(); + } + break; + } + } + } + return readData; + } + + public boolean doWrite() throws IOException + { + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); + for (int i = 0; i < bufArray.length; i++) + { + bufArray[i] = bufferIterator.next(); + } + + int byteBuffersWritten = 0; + + if(_transportEncryption == TransportEncryption.NONE) + { + + + long written = _socketChannel.write(bufArray); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " bytes"); + } + + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + + + return bufArray.length == byteBuffersWritten; + } + else if(_transportEncryption == TransportEncryption.TLS) + { + int remaining = 0; + do + { + if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) + { + _workDone = true; + final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); + _status = _sslEngine.wrap(bufArray, netBuffer); + runSSLEngineTasks(_status); + + netBuffer.flip(); + remaining = netBuffer.remaining(); + if (remaining != 0) + { + _encryptedOutput.add(netBuffer); + } + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + } + + } + while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); + long written = _socketChannel.write(encryptedBuffers); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " encrypted bytes"); + } + ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); + while(iter.hasNext()) + { + ByteBuffer buf = iter.next(); + if(buf.remaining() == 0) + { + iter.remove(); + } + else + { + break; + } + } + + return bufArray.length == byteBuffersWritten; + + } + else + { + return true; + } + } + + public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + public boolean runSSLEngineTasks(final SSLEngineResult status) + { + if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) + { + Runnable task; + while((task = _sslEngine.getDelegatedTask()) != null) + { + task.run(); + } + return true; + } + return false; + } + + public boolean looksLikeSSL(final byte[] headerBytes) + { + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + } + + @Override + public void send(final ByteBuffer msg) + { + if (_closed.get()) + { + throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); + } + // append to list and do selector wakeup + _buffers.add(msg); + _stateChanged.set(true); + } + + @Override + public void flush() + { + _stateChanged.set(true); + getSelector().wakeup(); + + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java new file mode 100644 index 0000000000..79313712a5 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; + +import javax.net.ssl.SSLContext; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.protocol.ServerProtocolEngine; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.io.AbstractNetworkTransport; +import org.apache.qpid.transport.network.io.IdleTimeoutTicker; + +public class NonBlockingNetworkTransport +{ + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private SelectorThread _selector; + + + private Set<TransportEncryption> _encryptionSet; + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocketChannel _serverSocket; + private int _timeout; + + public void close() + { + if(_selector != null) + { + try + { + if (_serverSocket != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + _serverSocket.close(); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + finally + { + + _selector.close(); + } + } + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext, + final Set<TransportEncryption> encryptionSet) + { + try + { + + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + _serverSocket = ServerSocketChannel.open(); + + _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + _serverSocket.bind(address); + _serverSocket.configureBlocking(false); + _encryptionSet = encryptionSet; + + _selector = new SelectorThread(config.getAddress().toString(), this); + _selector.start(); + _selector.addAcceptingSocket(_serverSocket); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + + + } + + public int getAcceptingPort() + { + return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + } + + public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + { + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + _config.getEnabledCipherSuites(), + _config.getDisabledCipherSuites(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + connection.start(); + + _selector.addConnection(connection); + + } + else + { + socketChannel.close(); + } + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java new file mode 100644 index 0000000000..786f1915a7 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.transport; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** +* Created by keith on 28/01/2015. +*/ +public class SelectorThread extends Thread +{ + private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); + private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); + private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); + private final Selector _selector; + private final AtomicBoolean _closed = new AtomicBoolean(); + private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NonBlockingNetworkTransport _transport; + + SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) + { + super("SelectorThread-"+name); + _transport = nonBlockingNetworkTransport; + try + { + _selector = Selector.open(); + } + catch (IOException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void addAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + + try + { + socketChannel.register(_selector, SelectionKey.OP_ACCEPT); + } + catch (ClosedChannelException e) + { + // TODO + e.printStackTrace(); + } + } + }); + _selector.wakeup(); + } + + public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + SelectionKey selectionKey = socketChannel.keyFor(_selector); + if(selectionKey != null) + { + selectionKey.cancel(); + } + } + }); + _selector.wakeup(); + } + + @Override + public void run() + { + + long nextTimeout = 0; + + try + { + while (!_closed.get()) + { + + _selector.select(nextTimeout); + + while(_tasks.peek() != null) + { + Runnable task = _tasks.poll(); + task.run(); + } + + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); + + + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); + _transport.acceptSocketChannel(acceptedChannel); + } + else + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + key.channel().register(_selector, 0); + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); + } + + } + selectionKeys.clear(); + + while (_unregisteredConnections.peek() != null) + { + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); + + + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + + } + + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); + + int period = connection.getTicker().getTimeToNextTick(currentTime); + if (period < 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + connection.getSocketChannel().register(_selector, 0).cancel(); + iterator.remove(); + } + else + { + nextTimeout = Math.min(period, nextTimeout); + } + } + + for (NonBlockingConnection connection : toBeScheduled) + { + _scheduler.schedule(connection); + } + + } + } + catch (IOException e) + { + //TODO + e.printStackTrace(); + } + finally + { + try + { + _selector.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + + + + } + + public void addConnection(final NonBlockingConnection connection) + { + _unregisteredConnections.add(connection); + _selector.wakeup(); + + } + + public void wakeup() + { + _selector.wakeup(); + } + + public void close() + { + _closed.set(true); + _selector.wakeup(); + _scheduler.close(); + } + + private class NetworkConnectionScheduler + { + private final ScheduledThreadPoolExecutor _executor; + private final AtomicInteger _running = new AtomicInteger(); + private final int _poolSize; + + private NetworkConnectionScheduler() + { + _poolSize = Runtime.getRuntime().availableProcessors(); + _executor = new ScheduledThreadPoolExecutor(_poolSize); + _executor.prestartAllCoreThreads(); + } + + public void processConnection(final NonBlockingConnection connection) + { + try + { + _running.incrementAndGet(); + boolean rerun; + do + { + rerun = false; + boolean closed = connection.doWork(); + + if (!closed) + { + + if (connection.isStateChanged()) + { + if (_running.get() == _poolSize) + { + schedule(connection); + } + else + { + rerun = true; + } + } + else + { + SelectorThread.this.addConnection(connection); + } + } + + } while (rerun); + } + finally + { + _running.decrementAndGet(); + } + } + + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + processConnection(connection); + } + }); + } + + public void close() + { + _executor.shutdown(); + } + + + + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index 5f5467db07..7874437a2f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -36,7 +36,6 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.TransportEncryption; -import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport; class TCPandSSLTransport implements AcceptingTransport { |
