summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:11:13 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:11:13 +0000
commit38194151e929fef7fa8adeb08badfa85a17d8404 (patch)
tree4db499ef5ef1f14a8506a123dffef2a79baeb8dc /qpid/java
parent503b33b9c8ae17a5dc77b8a306a120ea8cdb31d4 (diff)
downloadqpid-python-38194151e929fef7fa8adeb08badfa85a17d8404.tar.gz
refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java)439
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java)6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java)2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java1
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java2
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java187
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java1
27 files changed, 285 insertions, 433 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
index 6e1b6529d8..6100a2eb80 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
@@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/*
*
*/
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 946992cbb6..3b7883b9b9 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -30,7 +30,6 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index ee99233063..331c2e697d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -1,4 +1,5 @@
/*
+*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,11 +16,12 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
-
-package org.apache.qpid.transport.network.io;
+package org.apache.qpid.server.transport;
import java.io.IOException;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.security.Principal;
@@ -40,21 +42,30 @@ import javax.net.ssl.SSLPeerUnverifiedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
import org.apache.qpid.util.SystemUtils;
-public class NonBlockingSenderReceiver implements ByteBufferSender
+public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
{
- private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
- public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
private final SocketChannel _socketChannel;
+ private final long _timeout;
+ private final Ticker _ticker;
+ private final SelectorThread _selector;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+ private final Object _lock = new Object();
+
+ public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
@@ -63,11 +74,9 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ServerProtocolEngine _protocolEngine;
private final int _receiveBufSize;
- private final Ticker _ticker;
private final Set<TransportEncryption> _encryptionSet;
private final SSLContext _sslContext;
private final Runnable _onTransportEncryptionAction;
- private final NonBlockingConnection _connection;
private ByteBuffer _netInputBuffer;
private SSLEngine _sslEngine;
@@ -80,23 +89,28 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
private boolean _workDone;
- public NonBlockingSenderReceiver(final NonBlockingConnection connection,
- ServerProtocolEngine protocolEngine,
- int receiveBufSize,
- Ticker ticker,
- final Set<TransportEncryption> encryptionSet,
- final SSLContext sslContext,
- final boolean wantClientAuth,
- final boolean needClientAuth,
- final Collection<String> enabledCipherSuites,
- final Collection<String> disabledCipherSuites,
- final Runnable onTransportEncryptionAction)
+ public NonBlockingConnection(SocketChannel socketChannel,
+ ServerProtocolEngine delegate,
+ int sendBufferSize,
+ int receiveBufferSize,
+ long timeout,
+ Ticker ticker,
+ final Set<TransportEncryption> encryptionSet,
+ final SSLContext sslContext,
+ final boolean wantClientAuth,
+ final boolean needClientAuth,
+ final Collection<String> enabledCipherSuites,
+ final Collection<String> disabledCipherSuites,
+ final Runnable onTransportEncryptionAction,
+ final SelectorThread selectorThread)
{
- _connection = connection;
- _socketChannel = connection.getSocketChannel();
- _protocolEngine = protocolEngine;
- _receiveBufSize = receiveBufSize;
+ _socketChannel = socketChannel;
+ _timeout = timeout;
_ticker = ticker;
+ _selector = selectorThread;
+
+ _protocolEngine = delegate;
+ _receiveBufSize = receiveBufferSize;
_encryptionSet = encryptionSet;
_sslContext = sslContext;
_onTransportEncryptionAction = onTransportEncryptionAction;
@@ -138,20 +152,112 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
}
+
+ }
+
+
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return _socketChannel;
+ }
+
+ public void start()
+ {
+ }
+
+ public ByteBufferSender getSender()
+ {
+ return this;
+ }
+
+ public void close()
+ {
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+ if(_closed.compareAndSet(false,true))
+ {
+ _stateChanged.set(true);
+ getSelector().wakeup();
+ }
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socketChannel.socket().getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socketChannel.socket().getLocalSocketAddress();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _maxReadIdle = sec;
}
@Override
- public void send(final ByteBuffer msg)
+ public Principal getPeerPrincipal()
{
- if (_closed.get())
+ synchronized (_lock)
{
- throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
+ if(!_principalChecked)
+ {
+ if (_sslEngine != null)
+ {
+ try
+ {
+ _principal = _sslEngine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ return null;
+ }
+ }
+
+ _principalChecked = true;
+ }
+
+ return _principal;
}
- // append to list and do selector wakeup
- _buffers.add(msg);
- _stateChanged.set(true);
}
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+
+ public boolean canRead()
+ {
+ return true;
+ }
+
+ public boolean waitingForWrite()
+ {
+ return !_fullyWritten;
+ }
+
+ public boolean isStateChanged()
+ {
+
+ return _stateChanged.get();
+ }
public boolean doWork()
{
@@ -190,7 +296,12 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
catch (IOException e)
{
LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
- close();
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+ if(_closed.compareAndSet(false,true))
+ {
+ _stateChanged.set(true);
+ getSelector().wakeup();
+ }
}
}
else
@@ -241,119 +352,22 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
}
- @Override
- public void flush()
+ public SelectorThread getSelector()
{
- _stateChanged.set(true);
- _connection.getSelector().wakeup();
-
+ return _selector;
}
- @Override
- public void close()
+ public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
{
- LOGGER.debug("Closing " + _remoteSocketAddress);
- if(_closed.compareAndSet(false,true))
- {
- _stateChanged.set(true);
- _connection.getSelector().wakeup();
- }
- }
-
- private boolean doWrite() throws IOException
- {
-
- ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
- Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
- for (int i = 0; i < bufArray.length; i++)
- {
- bufArray[i] = bufferIterator.next();
- }
-
- int byteBuffersWritten = 0;
-
- if(_transportEncryption == TransportEncryption.NONE)
- {
-
-
- long written = _socketChannel.write(bufArray);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " bytes");
- }
-
- for (ByteBuffer buf : bufArray)
- {
- if (buf.remaining() == 0)
- {
- byteBuffersWritten++;
- _buffers.poll();
- }
- }
-
-
- return bufArray.length == byteBuffersWritten;
- }
- else if(_transportEncryption == TransportEncryption.TLS)
- {
- int remaining = 0;
- do
- {
- if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
- {
- _workDone = true;
- final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
- _status = _sslEngine.wrap(bufArray, netBuffer);
- runSSLEngineTasks(_status);
-
- netBuffer.flip();
- remaining = netBuffer.remaining();
- if (remaining != 0)
- {
- _encryptedOutput.add(netBuffer);
- }
- for (ByteBuffer buf : bufArray)
- {
- if (buf.remaining() == 0)
- {
- byteBuffersWritten++;
- _buffers.poll();
- }
- }
- }
-
- }
- while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
- ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
- long written = _socketChannel.write(encryptedBuffers);
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + written + " encrypted bytes");
- }
- ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
- while(iter.hasNext())
- {
- ByteBuffer buf = iter.next();
- if(buf.remaining() == 0)
- {
- iter.remove();
- }
- else
- {
- break;
- }
- }
-
- return bufArray.length == byteBuffersWritten;
-
- }
- else
- {
- return true;
- }
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
}
- private boolean doRead() throws IOException
+ public boolean doRead() throws IOException
{
boolean readData = false;
if(_transportEncryption == TransportEncryption.NONE)
@@ -475,26 +489,100 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
return readData;
}
- private boolean runSSLEngineTasks(final SSLEngineResult status)
+ public boolean doWrite() throws IOException
{
- if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for (int i = 0; i < bufArray.length; i++)
{
- Runnable task;
- while((task = _sslEngine.getDelegatedTask()) != null)
+ bufArray[i] = bufferIterator.next();
+ }
+
+ int byteBuffersWritten = 0;
+
+ if(_transportEncryption == TransportEncryption.NONE)
+ {
+
+
+ long written = _socketChannel.write(bufArray);
+ if (LOGGER.isDebugEnabled())
{
- task.run();
+ LOGGER.debug("Written " + written + " bytes");
}
- return true;
+
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+
+
+ return bufArray.length == byteBuffersWritten;
}
- return false;
- }
+ else if(_transportEncryption == TransportEncryption.TLS)
+ {
+ int remaining = 0;
+ do
+ {
+ if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+ {
+ _workDone = true;
+ final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _status = _sslEngine.wrap(bufArray, netBuffer);
+ runSSLEngineTasks(_status);
- private boolean looksLikeSSL(byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ netBuffer.flip();
+ remaining = netBuffer.remaining();
+ if (remaining != 0)
+ {
+ _encryptedOutput.add(netBuffer);
+ }
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+ }
+
+ }
+ while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+ ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+ long written = _socketChannel.write(encryptedBuffers);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " encrypted bytes");
+ }
+ ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer buf = iter.next();
+ if(buf.remaining() == 0)
+ {
+ iter.remove();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return bufArray.length == byteBuffersWritten;
+
+ }
+ else
+ {
+ return true;
+ }
}
- private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
+ public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
{
return headerBytes[0] == 22 && // SSL Handshake
(headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
@@ -505,47 +593,42 @@ public class NonBlockingSenderReceiver implements ByteBufferSender
(headerBytes[5] == 1); // client_hello
}
- private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
- public Principal getPeerPrincipal()
+ public boolean runSSLEngineTasks(final SSLEngineResult status)
{
-
- if (_sslEngine != null)
+ if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
{
- try
- {
- return _sslEngine.getSession().getPeerPrincipal();
- }
- catch (SSLPeerUnverifiedException e)
+ Runnable task;
+ while((task = _sslEngine.getDelegatedTask()) != null)
{
- return null;
+ task.run();
}
+ return true;
}
-
- return null;
+ return false;
}
- public boolean canRead()
+ public boolean looksLikeSSL(final byte[] headerBytes)
{
- return true;
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
}
- public boolean waitingForWrite()
+ @Override
+ public void send(final ByteBuffer msg)
{
- return !_fullyWritten;
+ if (_closed.get())
+ {
+ throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
+ }
+ // append to list and do selector wakeup
+ _buffers.add(msg);
+ _stateChanged.set(true);
}
- public boolean isStateChanged()
+ @Override
+ public void flush()
{
- return _stateChanged.get();
- }
+ _stateChanged.set(true);
+ getSelector().wakeup();
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
index 1c49efc294..79313712a5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.transport.network.io;
+package org.apache.qpid.server.transport;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -33,10 +33,12 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
+import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
public class NonBlockingNetworkTransport
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index bd8d3ad804..786f1915a7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.qpid.transport.network.io;
+package org.apache.qpid.server.transport;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index 5f5467db07..7874437a2f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -36,7 +36,6 @@ import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index e670c1f88b..dd43ae7e11 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 5c919252b8..4231045afd 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 75a162deb8..5f227e5f18 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -29,7 +29,7 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Consumer;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index d9b4495d6e..a2f1f1a4ba 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 8fdee7a0f7..d33297fbf6 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index e11d2ce9bb..a7b08e3f83 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index b05edc5d04..b9f013d253 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
public class WindowCreditManagerTest extends QpidTestCase
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 945e18e560..ed075038e6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -58,7 +58,7 @@ import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
index 2d32617106..6e5aab2dd5 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
@@ -19,7 +19,7 @@
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
public class NoAckCreditManager extends AbstractFlowCreditManager
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
index e63645ed09..a869a707e1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.flow.AbstractFlowCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
index 0058fe86a9..e8cf028069 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
index 7253111114..8817e79aff 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
index e72cc4d058..af37b17d85 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index b6c23dff7a..a442b5c437 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -44,7 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.End;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 589bd0ec04..ebd23f31ae 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,7 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index fa8134cb55..e72dc17b57 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 22cf76a3ea..d361dce682 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -52,7 +52,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Consumer;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
deleted file mode 100644
index df4d7c7721..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.protocol;
-
-import javax.security.auth.Subject;
-
-public interface ServerProtocolEngine extends ProtocolEngine
-{
- /**
- * Gets the connection ID associated with this ProtocolEngine
- */
- long getConnectionId();
-
- Subject getSubject();
-
- boolean isTransportBlockedForWriting();
-
- void setTransportBlockedForWriting(boolean blocked);
-
- void setMessageAssignmentSuspended(boolean value);
-
- boolean isMessageAssignmentSuspended();
-
- void processPendingMessages();
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
index 54a2a360bb..71704fca3a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
@@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportActivity;
-class IdleTimeoutTicker implements Ticker
+public class IdleTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
private final int _defaultTimeout;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
deleted file mode 100644
index 7c7c6929d1..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
-*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.io;
-
-import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
-import java.security.Principal;
-import java.util.Collection;
-import java.util.Set;
-
-import javax.net.ssl.SSLContext;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.Ticker;
-import org.apache.qpid.transport.network.TransportEncryption;
-
-public class NonBlockingConnection implements NetworkConnection
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
- private final SocketChannel _socketChannel;
- private final long _timeout;
- private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
- private final Ticker _ticker;
- private final SelectorThread _selector;
- private int _maxReadIdle;
- private int _maxWriteIdle;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public NonBlockingConnection(SocketChannel socketChannel,
- ServerProtocolEngine delegate,
- int sendBufferSize,
- int receiveBufferSize,
- long timeout,
- Ticker ticker,
- final Set<TransportEncryption> encryptionSet,
- final SSLContext sslContext,
- final boolean wantClientAuth,
- final boolean needClientAuth,
- final Collection<String> enabledCipherSuites,
- final Collection<String> disabledCipherSuites,
- final Runnable onTransportEncryptionAction,
- final SelectorThread selectorThread)
- {
- _socketChannel = socketChannel;
- _timeout = timeout;
- _ticker = ticker;
- _selector = selectorThread;
-
- _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this,
- delegate,
- receiveBufferSize,
- ticker,
- encryptionSet,
- sslContext,
- wantClientAuth,
- needClientAuth,
- enabledCipherSuites,
- disabledCipherSuites,
- onTransportEncryptionAction);
-
- }
-
-
- public Ticker getTicker()
- {
- return _ticker;
- }
-
- public SocketChannel getSocketChannel()
- {
- return _socketChannel;
- }
-
- public void start()
- {
- }
-
- public ByteBufferSender getSender()
- {
- return _nonBlockingSenderReceiver;
- }
-
- public void close()
- {
- _nonBlockingSenderReceiver.close();
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _socketChannel.socket().getRemoteSocketAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _socketChannel.socket().getLocalSocketAddress();
- }
-
- public void setMaxWriteIdle(int sec)
- {
- _maxWriteIdle = sec;
- }
-
- public void setMaxReadIdle(int sec)
- {
- _maxReadIdle = sec;
- }
-
- @Override
- public Principal getPeerPrincipal()
- {
- synchronized (_lock)
- {
- if(!_principalChecked)
- {
-
- _principal = _nonBlockingSenderReceiver.getPeerPrincipal();
-
- _principalChecked = true;
- }
-
- return _principal;
- }
- }
-
- @Override
- public int getMaxReadIdle()
- {
- return _maxReadIdle;
- }
-
- @Override
- public int getMaxWriteIdle()
- {
- return _maxWriteIdle;
- }
-
- public boolean canRead()
- {
- return _nonBlockingSenderReceiver.canRead();
- }
-
- public boolean waitingForWrite()
- {
- return _nonBlockingSenderReceiver.waitingForWrite();
- }
-
- public boolean isStateChanged()
- {
-
- return _nonBlockingSenderReceiver.isStateChanged();
- }
-
- public boolean doWork()
- {
- return _nonBlockingSenderReceiver.doWork();
- }
-
- public SelectorThread getSelector()
- {
- return _selector;
- }
-}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 84eb761899..a1e30ac83e 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -33,7 +33,6 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
-import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;