diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:11:13 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:11:13 +0000 |
| commit | 38194151e929fef7fa8adeb08badfa85a17d8404 (patch) | |
| tree | 4db499ef5ef1f14a8506a123dffef2a79baeb8dc /qpid/java/common | |
| parent | 503b33b9c8ae17a5dc77b8a306a120ea8cdb31d4 (diff) | |
| download | qpid-python-38194151e929fef7fa8adeb08badfa85a17d8404.tar.gz | |
refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
6 files changed, 1 insertions, 1269 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java deleted file mode 100644 index df4d7c7721..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.protocol; - -import javax.security.auth.Subject; - -public interface ServerProtocolEngine extends ProtocolEngine -{ - /** - * Gets the connection ID associated with this ProtocolEngine - */ - long getConnectionId(); - - Subject getSubject(); - - boolean isTransportBlockedForWriting(); - - void setTransportBlockedForWriting(boolean blocked); - - void setMessageAssignmentSuspended(boolean value); - - boolean isMessageAssignmentSuspended(); - - void processPendingMessages(); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java index 54a2a360bb..71704fca3a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportActivity; -class IdleTimeoutTicker implements Ticker +public class IdleTimeoutTicker implements Ticker { private final TransportActivity _transport; private final int _defaultTimeout; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java deleted file mode 100644 index 7c7c6929d1..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ /dev/null @@ -1,187 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.net.SocketAddress; -import java.nio.channels.SocketChannel; -import java.security.Principal; -import java.util.Collection; -import java.util.Set; - -import javax.net.ssl.SSLContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.network.TransportEncryption; - -public class NonBlockingConnection implements NetworkConnection -{ - private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); - private final SocketChannel _socketChannel; - private final long _timeout; - private final NonBlockingSenderReceiver _nonBlockingSenderReceiver; - private final Ticker _ticker; - private final SelectorThread _selector; - private int _maxReadIdle; - private int _maxWriteIdle; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public NonBlockingConnection(SocketChannel socketChannel, - ServerProtocolEngine delegate, - int sendBufferSize, - int receiveBufferSize, - long timeout, - Ticker ticker, - final Set<TransportEncryption> encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Collection<String> enabledCipherSuites, - final Collection<String> disabledCipherSuites, - final Runnable onTransportEncryptionAction, - final SelectorThread selectorThread) - { - _socketChannel = socketChannel; - _timeout = timeout; - _ticker = ticker; - _selector = selectorThread; - - _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this, - delegate, - receiveBufferSize, - ticker, - encryptionSet, - sslContext, - wantClientAuth, - needClientAuth, - enabledCipherSuites, - disabledCipherSuites, - onTransportEncryptionAction); - - } - - - public Ticker getTicker() - { - return _ticker; - } - - public SocketChannel getSocketChannel() - { - return _socketChannel; - } - - public void start() - { - } - - public ByteBufferSender getSender() - { - return _nonBlockingSenderReceiver; - } - - public void close() - { - _nonBlockingSenderReceiver.close(); - } - - public SocketAddress getRemoteAddress() - { - return _socketChannel.socket().getRemoteSocketAddress(); - } - - public SocketAddress getLocalAddress() - { - return _socketChannel.socket().getLocalSocketAddress(); - } - - public void setMaxWriteIdle(int sec) - { - _maxWriteIdle = sec; - } - - public void setMaxReadIdle(int sec) - { - _maxReadIdle = sec; - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - - _principal = _nonBlockingSenderReceiver.getPeerPrincipal(); - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _maxReadIdle; - } - - @Override - public int getMaxWriteIdle() - { - return _maxWriteIdle; - } - - public boolean canRead() - { - return _nonBlockingSenderReceiver.canRead(); - } - - public boolean waitingForWrite() - { - return _nonBlockingSenderReceiver.waitingForWrite(); - } - - public boolean isStateChanged() - { - - return _nonBlockingSenderReceiver.isStateChanged(); - } - - public boolean doWork() - { - return _nonBlockingSenderReceiver.doWork(); - } - - public SelectorThread getSelector() - { - return _selector; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java deleted file mode 100644 index 1c49efc294..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Set; - -import javax.net.ssl.SSLContext; - -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.TransportEncryption; - -public class NonBlockingNetworkTransport -{ - - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); - private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, - CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); - private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , - CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - private SelectorThread _selector; - - - private Set<TransportEncryption> _encryptionSet; - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocketChannel _serverSocket; - private int _timeout; - - public void close() - { - if(_selector != null) - { - try - { - if (_serverSocket != null) - { - _selector.cancelAcceptingSocket(_serverSocket); - _serverSocket.close(); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - finally - { - - _selector.close(); - } - } - } - - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext, - final Set<TransportEncryption> encryptionSet) - { - try - { - - _config = config; - _factory = factory; - _sslContext = sslContext; - _timeout = TIMEOUT; - - InetSocketAddress address = config.getAddress(); - - _serverSocket = ServerSocketChannel.open(); - - _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); - _serverSocket.bind(address); - _serverSocket.configureBlocking(false); - _encryptionSet = encryptionSet; - - _selector = new SelectorThread(config.getAddress().toString(), this); - _selector.start(); - _selector.addAcceptingSocket(_serverSocket); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - - - } - - public int getAcceptingPort() - { - return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); - } - - public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException - { - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); - - if(engine != null) - { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - new NonBlockingConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - _config.getEnabledCipherSuites(), - _config.getDisabledCipherSuites(), - new Runnable() - { - - @Override - public void run() - { - engine.encryptedTransport(); - } - }, - _selector); - - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - - ticker.setConnection(connection); - - connection.start(); - - _selector.addConnection(connection); - - } - else - { - socketChannel.close(); - } - } - - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java deleted file mode 100644 index ee99233063..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.security.Principal; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLPeerUnverifiedException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.SenderClosedException; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.network.TransportEncryption; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; -import org.apache.qpid.util.SystemUtils; - -public class NonBlockingSenderReceiver implements ByteBufferSender -{ - private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); - public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; - - private final SocketChannel _socketChannel; - - private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); - private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); - - private final String _remoteSocketAddress; - private final AtomicBoolean _closed = new AtomicBoolean(false); - private final ServerProtocolEngine _protocolEngine; - private final int _receiveBufSize; - private final Ticker _ticker; - private final Set<TransportEncryption> _encryptionSet; - private final SSLContext _sslContext; - private final Runnable _onTransportEncryptionAction; - private final NonBlockingConnection _connection; - private ByteBuffer _netInputBuffer; - private SSLEngine _sslEngine; - - private ByteBuffer _currentBuffer; - - private TransportEncryption _transportEncryption; - private SSLEngineResult _status; - private volatile boolean _fullyWritten = true; - private AtomicBoolean _stateChanged = new AtomicBoolean(); - private boolean _workDone; - - - public NonBlockingSenderReceiver(final NonBlockingConnection connection, - ServerProtocolEngine protocolEngine, - int receiveBufSize, - Ticker ticker, - final Set<TransportEncryption> encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Collection<String> enabledCipherSuites, - final Collection<String> disabledCipherSuites, - final Runnable onTransportEncryptionAction) - { - _connection = connection; - _socketChannel = connection.getSocketChannel(); - _protocolEngine = protocolEngine; - _receiveBufSize = receiveBufSize; - _ticker = ticker; - _encryptionSet = encryptionSet; - _sslContext = sslContext; - _onTransportEncryptionAction = onTransportEncryptionAction; - - if(encryptionSet.size() == 1) - { - _transportEncryption = _encryptionSet.iterator().next(); - if (_transportEncryption == TransportEncryption.TLS) - { - onTransportEncryptionAction.run(); - } - } - - if(encryptionSet.contains(TransportEncryption.TLS)) - { - _sslEngine = _sslContext.createSSLEngine(); - _sslEngine.setUseClientMode(false); - SSLUtil.removeSSLv3Support(_sslEngine); - SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); - - if(needClientAuth) - { - _sslEngine.setNeedClientAuth(true); - } - else if(wantClientAuth) - { - _sslEngine.setWantClientAuth(true); - } - _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); - } - - try - { - _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); - _socketChannel.configureBlocking(false); - } - catch (IOException e) - { - throw new SenderException("Unable to prepare the channel for non-blocking IO", e); - } - - } - - @Override - public void send(final ByteBuffer msg) - { - if (_closed.get()) - { - throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); - } - // append to list and do selector wakeup - _buffers.add(msg); - _stateChanged.set(true); - } - - - public boolean doWork() - { - _stateChanged.set(false); - boolean closed = _closed.get(); - if (!closed) - { - try - { - _workDone = false; - - long currentTime = System.currentTimeMillis(); - int tick = _ticker.getTimeToNextTick(currentTime); - if (tick <= 0) - { - _ticker.tick(currentTime); - } - - _protocolEngine.setMessageAssignmentSuspended(true); - - _protocolEngine.processPendingMessages(); - - _protocolEngine.setTransportBlockedForWriting(!doWrite()); - boolean dataRead = doRead(); - _fullyWritten = doWrite(); - _protocolEngine.setTransportBlockedForWriting(!_fullyWritten); - - if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) - { - _stateChanged.set(true); - } - - // tell all consumer targets that it is okay to accept more - _protocolEngine.setMessageAssignmentSuspended(false); - } - catch (IOException e) - { - LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); - close(); - } - } - else - { - - if(!SystemUtils.isWindows()) - { - try - { - _socketChannel.shutdownInput(); - } - catch (IOException e) - { - LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e); - - } - } - try - { - while(!doWrite()) - { - } - } - catch (IOException e) - { - LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); - - } - LOGGER.debug("Closing receiver"); - _protocolEngine.closed(); - - try - { - if(!SystemUtils.isWindows()) - { - _socketChannel.shutdownOutput(); - } - - _socketChannel.close(); - } - catch (IOException e) - { - LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e); - } - } - - return closed; - - } - - @Override - public void flush() - { - _stateChanged.set(true); - _connection.getSelector().wakeup(); - - } - - @Override - public void close() - { - LOGGER.debug("Closing " + _remoteSocketAddress); - if(_closed.compareAndSet(false,true)) - { - _stateChanged.set(true); - _connection.getSelector().wakeup(); - } - } - - private boolean doWrite() throws IOException - { - - ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; - Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); - for (int i = 0; i < bufArray.length; i++) - { - bufArray[i] = bufferIterator.next(); - } - - int byteBuffersWritten = 0; - - if(_transportEncryption == TransportEncryption.NONE) - { - - - long written = _socketChannel.write(bufArray); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + written + " bytes"); - } - - for (ByteBuffer buf : bufArray) - { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _buffers.poll(); - } - } - - - return bufArray.length == byteBuffersWritten; - } - else if(_transportEncryption == TransportEncryption.TLS) - { - int remaining = 0; - do - { - if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) - { - _workDone = true; - final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); - _status = _sslEngine.wrap(bufArray, netBuffer); - runSSLEngineTasks(_status); - - netBuffer.flip(); - remaining = netBuffer.remaining(); - if (remaining != 0) - { - _encryptedOutput.add(netBuffer); - } - for (ByteBuffer buf : bufArray) - { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _buffers.poll(); - } - } - } - - } - while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); - ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); - long written = _socketChannel.write(encryptedBuffers); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + written + " encrypted bytes"); - } - ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); - while(iter.hasNext()) - { - ByteBuffer buf = iter.next(); - if(buf.remaining() == 0) - { - iter.remove(); - } - else - { - break; - } - } - - return bufArray.length == byteBuffersWritten; - - } - else - { - return true; - } - } - - private boolean doRead() throws IOException - { - boolean readData = false; - if(_transportEncryption == TransportEncryption.NONE) - { - int remaining = 0; - while (remaining == 0 && !_closed.get()) - { - if (_currentBuffer == null || _currentBuffer.remaining() == 0) - { - _currentBuffer = ByteBuffer.allocate(_receiveBufSize); - } - int read = _socketChannel.read(_currentBuffer); - if(read > 0) - { - readData = true; - } - if (read == -1) - { - _closed.set(true); - } - remaining = _currentBuffer.remaining(); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Read " + read + " byte(s)"); - } - ByteBuffer dup = _currentBuffer.duplicate(); - dup.flip(); - _currentBuffer = _currentBuffer.slice(); - _protocolEngine.received(dup); - } - } - else if(_transportEncryption == TransportEncryption.TLS) - { - int read = 1; - while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) - { - read = _socketChannel.read(_netInputBuffer); - if (read == -1) - { - _closed.set(true); - } - else if(read > 0) - { - readData = true; - } - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Read " + read + " encrypted bytes "); - } - - _netInputBuffer.flip(); - - - int unwrapped = 0; - boolean tasksRun; - do - { - ByteBuffer appInputBuffer = - ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); - - _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); - tasksRun = runSSLEngineTasks(_status); - - appInputBuffer.flip(); - unwrapped = appInputBuffer.remaining(); - if(unwrapped > 0) - { - readData = true; - } - _protocolEngine.received(appInputBuffer); - } - while(unwrapped > 0 || tasksRun); - - _netInputBuffer.compact(); - - } - } - else - { - int read = 1; - while (!_closed.get() && read > 0) - { - - read = _socketChannel.read(_netInputBuffer); - if (read == -1) - { - _closed.set(true); - } - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); - } - - if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) - { - _netInputBuffer.flip(); - final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; - ByteBuffer dup = _netInputBuffer.duplicate(); - dup.get(headerBytes); - - _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE; - LOGGER.debug("Identified transport encryption as " + _transportEncryption); - - if (_transportEncryption == TransportEncryption.NONE) - { - _protocolEngine.received(_netInputBuffer); - } - else - { - _onTransportEncryptionAction.run(); - _netInputBuffer.compact(); - readData = doRead(); - } - break; - } - } - } - return readData; - } - - private boolean runSSLEngineTasks(final SSLEngineResult status) - { - if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) - { - Runnable task; - while((task = _sslEngine.getDelegatedTask()) != null) - { - task.run(); - } - return true; - } - return false; - } - - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - public Principal getPeerPrincipal() - { - - if (_sslEngine != null) - { - try - { - return _sslEngine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) - { - return null; - } - } - - return null; - } - - public boolean canRead() - { - return true; - } - - public boolean waitingForWrite() - { - return !_fullyWritten; - } - - public boolean isStateChanged() - { - return _stateChanged.get(); - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java deleted file mode 100644 index bd8d3ad804..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** -* Created by keith on 28/01/2015. -*/ -public class SelectorThread extends Thread -{ - private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); - private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); - private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); - private final Selector _selector; - private final AtomicBoolean _closed = new AtomicBoolean(); - private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); - private final NonBlockingNetworkTransport _transport; - - SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) - { - super("SelectorThread-"+name); - _transport = nonBlockingNetworkTransport; - try - { - _selector = Selector.open(); - } - catch (IOException e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - public void addAcceptingSocket(final ServerSocketChannel socketChannel) - { - _tasks.add(new Runnable() - { - @Override - public void run() - { - - try - { - socketChannel.register(_selector, SelectionKey.OP_ACCEPT); - } - catch (ClosedChannelException e) - { - // TODO - e.printStackTrace(); - } - } - }); - _selector.wakeup(); - } - - public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) - { - _tasks.add(new Runnable() - { - @Override - public void run() - { - SelectionKey selectionKey = socketChannel.keyFor(_selector); - if(selectionKey != null) - { - selectionKey.cancel(); - } - } - }); - _selector.wakeup(); - } - - @Override - public void run() - { - - long nextTimeout = 0; - - try - { - while (!_closed.get()) - { - - _selector.select(nextTimeout); - - while(_tasks.peek() != null) - { - Runnable task = _tasks.poll(); - task.run(); - } - - List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - - - Set<SelectionKey> selectionKeys = _selector.selectedKeys(); - for (SelectionKey key : selectionKeys) - { - if(key.isAcceptable()) - { - // todo - should we schedule this rather than running in this thread? - SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); - _transport.acceptSocketChannel(acceptedChannel); - } - else - { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - - key.channel().register(_selector, 0); - - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); - } - - } - selectionKeys.clear(); - - while (_unregisteredConnections.peek() != null) - { - NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); - _unscheduledConnections.add(unregisteredConnection); - - - final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) - | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); - unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); - - } - - long currentTime = System.currentTimeMillis(); - Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); - nextTimeout = Integer.MAX_VALUE; - while (iterator.hasNext()) - { - NonBlockingConnection connection = iterator.next(); - - int period = connection.getTicker().getTimeToNextTick(currentTime); - if (period < 0 || connection.isStateChanged()) - { - toBeScheduled.add(connection); - connection.getSocketChannel().register(_selector, 0).cancel(); - iterator.remove(); - } - else - { - nextTimeout = Math.min(period, nextTimeout); - } - } - - for (NonBlockingConnection connection : toBeScheduled) - { - _scheduler.schedule(connection); - } - - } - } - catch (IOException e) - { - //TODO - e.printStackTrace(); - } - finally - { - try - { - _selector.close(); - } - catch (IOException e) - { - e.printStackTrace(); - } - } - - - - - } - - public void addConnection(final NonBlockingConnection connection) - { - _unregisteredConnections.add(connection); - _selector.wakeup(); - - } - - public void wakeup() - { - _selector.wakeup(); - } - - public void close() - { - _closed.set(true); - _selector.wakeup(); - _scheduler.close(); - } - - private class NetworkConnectionScheduler - { - private final ScheduledThreadPoolExecutor _executor; - private final AtomicInteger _running = new AtomicInteger(); - private final int _poolSize; - - private NetworkConnectionScheduler() - { - _poolSize = Runtime.getRuntime().availableProcessors(); - _executor = new ScheduledThreadPoolExecutor(_poolSize); - _executor.prestartAllCoreThreads(); - } - - public void processConnection(final NonBlockingConnection connection) - { - try - { - _running.incrementAndGet(); - boolean rerun; - do - { - rerun = false; - boolean closed = connection.doWork(); - - if (!closed) - { - - if (connection.isStateChanged()) - { - if (_running.get() == _poolSize) - { - schedule(connection); - } - else - { - rerun = true; - } - } - else - { - SelectorThread.this.addConnection(connection); - } - } - - } while (rerun); - } - finally - { - _running.decrementAndGet(); - } - } - - public void schedule(final NonBlockingConnection connection) - { - _executor.submit(new Runnable() - { - @Override - public void run() - { - processConnection(connection); - } - }); - } - - public void close() - { - _executor.shutdown(); - } - - - - } -} |
