summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:11:13 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:11:13 +0000
commit38194151e929fef7fa8adeb08badfa85a17d8404 (patch)
tree4db499ef5ef1f14a8506a123dffef2a79baeb8dc /qpid/java/common
parent503b33b9c8ae17a5dc77b8a306a120ea8cdb31d4 (diff)
downloadqpid-python-38194151e929fef7fa8adeb08badfa85a17d8404.tar.gz
refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java187
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java185
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java551
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java302
6 files changed, 1 insertions, 1269 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
deleted file mode 100644
index df4d7c7721..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.protocol;
-
-import javax.security.auth.Subject;
-
-public interface ServerProtocolEngine extends ProtocolEngine
-{
- /**
- * Gets the connection ID associated with this ProtocolEngine
- */
- long getConnectionId();
-
- Subject getSubject();
-
- boolean isTransportBlockedForWriting();
-
- void setTransportBlockedForWriting(boolean blocked);
-
- void setMessageAssignmentSuspended(boolean value);
-
- boolean isMessageAssignmentSuspended();
-
- void processPendingMessages();
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
index 54a2a360bb..71704fca3a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
@@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportActivity;
-class IdleTimeoutTicker implements Ticker
+public class IdleTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
private final int _defaultTimeout;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
deleted file mode 100644
index 7c7c6929d1..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.io;
-
-import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
-import java.security.Principal;
-import java.util.Collection;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.Ticker;
-import org.apache.qpid.transport.network.TransportEncryption;
-
-public class NonBlockingConnection implements NetworkConnection
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
- private final SocketChannel _socketChannel;
- private final long _timeout;
- private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
- private final Ticker _ticker;
- private final SelectorThread _selector;
- private int _maxReadIdle;
- private int _maxWriteIdle;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public NonBlockingConnection(SocketChannel socketChannel,
- ServerProtocolEngine delegate,
- int sendBufferSize,
- int receiveBufferSize,
- long timeout,
- Ticker ticker,
- final Set<TransportEncryption> encryptionSet,
- final SSLContext sslContext,
- final boolean wantClientAuth,
- final boolean needClientAuth,
- final Collection<String> enabledCipherSuites,
- final Collection<String> disabledCipherSuites,
- final Runnable onTransportEncryptionAction,
- final SelectorThread selectorThread)
- {
- _socketChannel = socketChannel;
- _timeout = timeout;
- _ticker = ticker;
- _selector = selectorThread;
-
- _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this,
- delegate,
- receiveBufferSize,
- ticker,
- encryptionSet,
- sslContext,
- wantClientAuth,
- needClientAuth,
- enabledCipherSuites,
- disabledCipherSuites,
- onTransportEncryptionAction);
-
- }
-
-
- public Ticker getTicker()
- {
- return _ticker;
- }
-
- public SocketChannel getSocketChannel()
- {
- return _socketChannel;
- }
-
- public void start()
- {
- }
-
- public ByteBufferSender getSender()
- {
- return _nonBlockingSenderReceiver;
- }
-
- public void close()
- {
- _nonBlockingSenderReceiver.close();
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _socketChannel.socket().getRemoteSocketAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _socketChannel.socket().getLocalSocketAddress();
- }
-
- public void setMaxWriteIdle(int sec)
- {
- _maxWriteIdle = sec;
- }
-
- public void setMaxReadIdle(int sec)
- {
- _maxReadIdle = sec;
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
-
- _principal = _nonBlockingSenderReceiver.getPeerPrincipal();
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _maxReadIdle;
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _maxWriteIdle;
- }
-
- public boolean canRead()
- {
- return _nonBlockingSenderReceiver.canRead();
- }
-
- public boolean waitingForWrite()
- {
- return _nonBlockingSenderReceiver.waitingForWrite();
- }
-
- public boolean isStateChanged()
- {
-
- return _nonBlockingSenderReceiver.isStateChanged();
- }
-
- public boolean doWork()
- {
- return _nonBlockingSenderReceiver.doWork();
- }
-
- public SelectorThread getSelector()
- {
- return _selector;
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
deleted file mode 100644
index 1c49efc294..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.TransportEncryption;
-
-public class NonBlockingNetworkTransport
-{
-
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
- private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
- CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
- CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
- private SelectorThread _selector;
-
-
- private Set<TransportEncryption> _encryptionSet;
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocketChannel _serverSocket;
- private int _timeout;
-
- public void close()
- {
- if(_selector != null)
- {
- try
- {
- if (_serverSocket != null)
- {
- _selector.cancelAcceptingSocket(_serverSocket);
- _serverSocket.close();
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- finally
- {
-
- _selector.close();
- }
- }
- }
-
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext,
- final Set<TransportEncryption> encryptionSet)
- {
- try
- {
-
- _config = config;
- _factory = factory;
- _sslContext = sslContext;
- _timeout = TIMEOUT;
-
- InetSocketAddress address = config.getAddress();
-
- _serverSocket = ServerSocketChannel.open();
-
- _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
- _serverSocket.bind(address);
- _serverSocket.configureBlocking(false);
- _encryptionSet = encryptionSet;
-
- _selector = new SelectorThread(config.getAddress().toString(), this);
- _selector.start();
- _selector.addAcceptingSocket(_serverSocket);
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start AMQP on port : " + config, e);
- }
-
-
- }
-
- public int getAcceptingPort()
- {
- return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
- }
-
- public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
- {
- final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
- .getRemoteSocketAddress());
-
- if(engine != null)
- {
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-
- NonBlockingConnection connection =
- new NonBlockingConnection(socketChannel,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker,
- _encryptionSet,
- _sslContext,
- _config.wantClientAuth(),
- _config.needClientAuth(),
- _config.getEnabledCipherSuites(),
- _config.getDisabledCipherSuites(),
- new Runnable()
- {
-
- @Override
- public void run()
- {
- engine.encryptedTransport();
- }
- },
- _selector);
-
- engine.setNetworkConnection(connection, connection.getSender());
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- connection.start();
-
- _selector.addConnection(connection);
-
- }
- else
- {
- socketChannel.close();
- }
- }
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
deleted file mode 100644
index ee99233063..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLPeerUnverifiedException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.Ticker;
-import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-import org.apache.qpid.util.SystemUtils;
-
-public class NonBlockingSenderReceiver implements ByteBufferSender
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
- public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-
- private final SocketChannel _socketChannel;
-
- private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
- private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
-
- private final String _remoteSocketAddress;
- private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final ServerProtocolEngine _protocolEngine;
- private final int _receiveBufSize;
- private final Ticker _ticker;
- private final Set<TransportEncryption> _encryptionSet;
- private final SSLContext _sslContext;
- private final Runnable _onTransportEncryptionAction;
- private final NonBlockingConnection _connection;
- private ByteBuffer _netInputBuffer;
- private SSLEngine _sslEngine;
-
- private ByteBuffer _currentBuffer;
-
- private TransportEncryption _transportEncryption;
- private SSLEngineResult _status;
- private volatile boolean _fullyWritten = true;
- private AtomicBoolean _stateChanged = new AtomicBoolean();
- private boolean _workDone;
-
-
- public NonBlockingSenderReceiver(final NonBlockingConnection connection,
- ServerProtocolEngine protocolEngine,
- int receiveBufSize,
- Ticker ticker,
- final Set<TransportEncryption> encryptionSet,
- final SSLContext sslContext,
- final boolean wantClientAuth,
- final boolean needClientAuth,
- final Collection<String> enabledCipherSuites,
- final Collection<String> disabledCipherSuites,
- final Runnable onTransportEncryptionAction)
- {
- _connection = connection;
- _socketChannel = connection.getSocketChannel();
- _protocolEngine = protocolEngine;
- _receiveBufSize = receiveBufSize;
- _ticker = ticker;
- _encryptionSet = encryptionSet;
- _sslContext = sslContext;
- _onTransportEncryptionAction = onTransportEncryptionAction;
-
- if(encryptionSet.size() == 1)
- {
- _transportEncryption = _encryptionSet.iterator().next();
- if (_transportEncryption == TransportEncryption.TLS)
- {
- onTransportEncryptionAction.run();
- }
- }
-
- if(encryptionSet.contains(TransportEncryption.TLS))
- {
- _sslEngine = _sslContext.createSSLEngine();
- _sslEngine.setUseClientMode(false);
- SSLUtil.removeSSLv3Support(_sslEngine);
- SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
-
- if(needClientAuth)
- {
- _sslEngine.setNeedClientAuth(true);
- }
- else if(wantClientAuth)
- {
- _sslEngine.setWantClientAuth(true);
- }
- _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
- }
-
- try
- {
- _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
- _socketChannel.configureBlocking(false);
- }
- catch (IOException e)
- {
- throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
- }
-
- }
-
- @Override
- public void send(final ByteBuffer msg)
- {
- if (_closed.get())
- {
- throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
- }
- // append to list and do selector wakeup
- _buffers.add(msg);
- _stateChanged.set(true);
- }
-
-
- public boolean doWork()
- {
- _stateChanged.set(false);
- boolean closed = _closed.get();
- if (!closed)
- {
- try
- {
- _workDone = false;
-
- long currentTime = System.currentTimeMillis();
- int tick = _ticker.getTimeToNextTick(currentTime);
- if (tick <= 0)
- {
- _ticker.tick(currentTime);
- }
-
- _protocolEngine.setMessageAssignmentSuspended(true);
-
- _protocolEngine.processPendingMessages();
-
- _protocolEngine.setTransportBlockedForWriting(!doWrite());
- boolean dataRead = doRead();
- _fullyWritten = doWrite();
- _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
-
- if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
- {
- _stateChanged.set(true);
- }
-
- // tell all consumer targets that it is okay to accept more
- _protocolEngine.setMessageAssignmentSuspended(false);
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
- close();
- }
- }
- else
- {
-
- if(!SystemUtils.isWindows())
- {
- try
- {
- _socketChannel.shutdownInput();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception shutting down input for thread '" + _remoteSocketAddress + "': " + e);
-
- }
- }
- try
- {
- while(!doWrite())
- {
- }
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
-
- }
- LOGGER.debug("Closing receiver");
- _protocolEngine.closed();
-
- try
- {
- if(!SystemUtils.isWindows())
- {
- _socketChannel.shutdownOutput();
- }
-
- _socketChannel.close();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception closing socket thread '" + _remoteSocketAddress + "': " + e);
- }
- }
-
- return closed;
-
- }
-
- @Override
- public void flush()
- {
- _stateChanged.set(true);
- _connection.getSelector().wakeup();
-
- }
-
- @Override
- public void close()
- {
- LOGGER.debug("Closing " + _remoteSocketAddress);
- if(_closed.compareAndSet(false,true))
- {
- _stateChanged.set(true);
- _connection.getSelector().wakeup();
- }
- }
-
- private boolean doWrite() throws IOException
- {
-
- ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
- Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
- for (int i = 0; i < bufArray.length; i++)
- {
- bufArray[i] = bufferIterator.next();
- }
-
- int byteBuffersWritten = 0;
-
- if(_transportEncryption == TransportEncryption.NONE)
- {
-
-
- long written = _socketChannel.write(bufArray);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " bytes");
- }
-
- for (ByteBuffer buf : bufArray)
- {
- if (buf.remaining() == 0)
- {
- byteBuffersWritten++;
- _buffers.poll();
- }
- }
-
-
- return bufArray.length == byteBuffersWritten;
- }
- else if(_transportEncryption == TransportEncryption.TLS)
- {
- int remaining = 0;
- do
- {
- if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
- {
- _workDone = true;
- final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
- _status = _sslEngine.wrap(bufArray, netBuffer);
- runSSLEngineTasks(_status);
-
- netBuffer.flip();
- remaining = netBuffer.remaining();
- if (remaining != 0)
- {
- _encryptedOutput.add(netBuffer);
- }
- for (ByteBuffer buf : bufArray)
- {
- if (buf.remaining() == 0)
- {
- byteBuffersWritten++;
- _buffers.poll();
- }
- }
- }
-
- }
- while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
- ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
- long written = _socketChannel.write(encryptedBuffers);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " encrypted bytes");
- }
- ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
- while(iter.hasNext())
- {
- ByteBuffer buf = iter.next();
- if(buf.remaining() == 0)
- {
- iter.remove();
- }
- else
- {
- break;
- }
- }
-
- return bufArray.length == byteBuffersWritten;
-
- }
- else
- {
- return true;
- }
- }
-
- private boolean doRead() throws IOException
- {
- boolean readData = false;
- if(_transportEncryption == TransportEncryption.NONE)
- {
- int remaining = 0;
- while (remaining == 0 && !_closed.get())
- {
- if (_currentBuffer == null || _currentBuffer.remaining() == 0)
- {
- _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
- }
- int read = _socketChannel.read(_currentBuffer);
- if(read > 0)
- {
- readData = true;
- }
- if (read == -1)
- {
- _closed.set(true);
- }
- remaining = _currentBuffer.remaining();
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Read " + read + " byte(s)");
- }
- ByteBuffer dup = _currentBuffer.duplicate();
- dup.flip();
- _currentBuffer = _currentBuffer.slice();
- _protocolEngine.received(dup);
- }
- }
- else if(_transportEncryption == TransportEncryption.TLS)
- {
- int read = 1;
- while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
- {
- read = _socketChannel.read(_netInputBuffer);
- if (read == -1)
- {
- _closed.set(true);
- }
- else if(read > 0)
- {
- readData = true;
- }
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Read " + read + " encrypted bytes ");
- }
-
- _netInputBuffer.flip();
-
-
- int unwrapped = 0;
- boolean tasksRun;
- do
- {
- ByteBuffer appInputBuffer =
- ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
-
- _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
- tasksRun = runSSLEngineTasks(_status);
-
- appInputBuffer.flip();
- unwrapped = appInputBuffer.remaining();
- if(unwrapped > 0)
- {
- readData = true;
- }
- _protocolEngine.received(appInputBuffer);
- }
- while(unwrapped > 0 || tasksRun);
-
- _netInputBuffer.compact();
-
- }
- }
- else
- {
- int read = 1;
- while (!_closed.get() && read > 0)
- {
-
- read = _socketChannel.read(_netInputBuffer);
- if (read == -1)
- {
- _closed.set(true);
- }
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
- }
-
- if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
- {
- _netInputBuffer.flip();
- final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
- ByteBuffer dup = _netInputBuffer.duplicate();
- dup.get(headerBytes);
-
- _transportEncryption = looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
- LOGGER.debug("Identified transport encryption as " + _transportEncryption);
-
- if (_transportEncryption == TransportEncryption.NONE)
- {
- _protocolEngine.received(_netInputBuffer);
- }
- else
- {
- _onTransportEncryptionAction.run();
- _netInputBuffer.compact();
- readData = doRead();
- }
- break;
- }
- }
- }
- return readData;
- }
-
- private boolean runSSLEngineTasks(final SSLEngineResult status)
- {
- if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
- {
- Runnable task;
- while((task = _sslEngine.getDelegatedTask()) != null)
- {
- task.run();
- }
- return true;
- }
- return false;
- }
-
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
- public Principal getPeerPrincipal()
- {
-
- if (_sslEngine != null)
- {
- try
- {
- return _sslEngine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
- {
- return null;
- }
- }
-
- return null;
- }
-
- public boolean canRead()
- {
- return true;
- }
-
- public boolean waitingForWrite()
- {
- return !_fullyWritten;
- }
-
- public boolean isStateChanged()
- {
- return _stateChanged.get();
- }
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
deleted file mode 100644
index bd8d3ad804..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
-* Created by keith on 28/01/2015.
-*/
-public class SelectorThread extends Thread
-{
- private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
- private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
- private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
- private final Selector _selector;
- private final AtomicBoolean _closed = new AtomicBoolean();
- private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
- private final NonBlockingNetworkTransport _transport;
-
- SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
- {
- super("SelectorThread-"+name);
- _transport = nonBlockingNetworkTransport;
- try
- {
- _selector = Selector.open();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- public void addAcceptingSocket(final ServerSocketChannel socketChannel)
- {
- _tasks.add(new Runnable()
- {
- @Override
- public void run()
- {
-
- try
- {
- socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
- }
- catch (ClosedChannelException e)
- {
- // TODO
- e.printStackTrace();
- }
- }
- });
- _selector.wakeup();
- }
-
- public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
- {
- _tasks.add(new Runnable()
- {
- @Override
- public void run()
- {
- SelectionKey selectionKey = socketChannel.keyFor(_selector);
- if(selectionKey != null)
- {
- selectionKey.cancel();
- }
- }
- });
- _selector.wakeup();
- }
-
- @Override
- public void run()
- {
-
- long nextTimeout = 0;
-
- try
- {
- while (!_closed.get())
- {
-
- _selector.select(nextTimeout);
-
- while(_tasks.peek() != null)
- {
- Runnable task = _tasks.poll();
- task.run();
- }
-
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
-
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
- {
- if(key.isAcceptable())
- {
- // todo - should we schedule this rather than running in this thread?
- SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
- _transport.acceptSocketChannel(acceptedChannel);
- }
- else
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
-
- key.channel().register(_selector, 0);
-
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
- }
-
- }
- selectionKeys.clear();
-
- while (_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
-
-
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
-
- }
-
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while (iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
-
- int period = connection.getTicker().getTimeToNextTick(currentTime);
- if (period < 0 || connection.isStateChanged())
- {
- toBeScheduled.add(connection);
- connection.getSocketChannel().register(_selector, 0).cancel();
- iterator.remove();
- }
- else
- {
- nextTimeout = Math.min(period, nextTimeout);
- }
- }
-
- for (NonBlockingConnection connection : toBeScheduled)
- {
- _scheduler.schedule(connection);
- }
-
- }
- }
- catch (IOException e)
- {
- //TODO
- e.printStackTrace();
- }
- finally
- {
- try
- {
- _selector.close();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
-
-
-
- }
-
- public void addConnection(final NonBlockingConnection connection)
- {
- _unregisteredConnections.add(connection);
- _selector.wakeup();
-
- }
-
- public void wakeup()
- {
- _selector.wakeup();
- }
-
- public void close()
- {
- _closed.set(true);
- _selector.wakeup();
- _scheduler.close();
- }
-
- private class NetworkConnectionScheduler
- {
- private final ScheduledThreadPoolExecutor _executor;
- private final AtomicInteger _running = new AtomicInteger();
- private final int _poolSize;
-
- private NetworkConnectionScheduler()
- {
- _poolSize = Runtime.getRuntime().availableProcessors();
- _executor = new ScheduledThreadPoolExecutor(_poolSize);
- _executor.prestartAllCoreThreads();
- }
-
- public void processConnection(final NonBlockingConnection connection)
- {
- try
- {
- _running.incrementAndGet();
- boolean rerun;
- do
- {
- rerun = false;
- boolean closed = connection.doWork();
-
- if (!closed)
- {
-
- if (connection.isStateChanged())
- {
- if (_running.get() == _poolSize)
- {
- schedule(connection);
- }
- else
- {
- rerun = true;
- }
- }
- else
- {
- SelectorThread.this.addConnection(connection);
- }
- }
-
- } while (rerun);
- }
- finally
- {
- _running.decrementAndGet();
- }
- }
-
- public void schedule(final NonBlockingConnection connection)
- {
- _executor.submit(new Runnable()
- {
- @Override
- public void run()
- {
- processConnection(connection);
- }
- });
- }
-
- public void close()
- {
- _executor.shutdown();
- }
-
-
-
- }
-}