diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:11:13 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:11:13 +0000 |
| commit | 38194151e929fef7fa8adeb08badfa85a17d8404 (patch) | |
| tree | 4db499ef5ef1f14a8506a123dffef2a79baeb8dc /qpid/java | |
| parent | 503b33b9c8ae17a5dc77b8a306a120ea8cdb31d4 (diff) | |
| download | qpid-python-38194151e929fef7fa8adeb08badfa85a17d8404.tar.gz | |
refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
27 files changed, 285 insertions, 433 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java index 6e1b6529d8..6100a2eb80 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java @@ -19,7 +19,7 @@ package org.apache.qpid.server.plugin;/* * */ -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 946992cbb6..3b7883b9b9 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -30,7 +30,6 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index ee99233063..331c2e697d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -1,4 +1,5 @@ /* +* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,11 +16,12 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * */ - -package org.apache.qpid.transport.network.io; +package org.apache.qpid.server.transport; import java.io.IOException; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.security.Principal; @@ -40,21 +42,30 @@ import javax.net.ssl.SSLPeerUnverifiedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportEncryption; import org.apache.qpid.transport.network.security.ssl.SSLUtil; import org.apache.qpid.util.SystemUtils; -public class NonBlockingSenderReceiver implements ByteBufferSender +public class NonBlockingConnection implements NetworkConnection, ByteBufferSender { - private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); - public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; - + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); private final SocketChannel _socketChannel; + private final long _timeout; + private final Ticker _ticker; + private final SelectorThread _selector; + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private final Object _lock = new Object(); + + public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); @@ -63,11 +74,9 @@ public class NonBlockingSenderReceiver implements ByteBufferSender private final AtomicBoolean _closed = new AtomicBoolean(false); private final ServerProtocolEngine _protocolEngine; private final int _receiveBufSize; - private final Ticker _ticker; private final Set<TransportEncryption> _encryptionSet; private final SSLContext _sslContext; private final Runnable _onTransportEncryptionAction; - private final NonBlockingConnection _connection; private ByteBuffer _netInputBuffer; private SSLEngine _sslEngine; @@ -80,23 +89,28 @@ public class NonBlockingSenderReceiver implements ByteBufferSender private boolean _workDone; - public NonBlockingSenderReceiver(final NonBlockingConnection connection, - ServerProtocolEngine protocolEngine, - int receiveBufSize, - Ticker ticker, - final Set<TransportEncryption> encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Collection<String> enabledCipherSuites, - final Collection<String> disabledCipherSuites, - final Runnable onTransportEncryptionAction) + public NonBlockingConnection(SocketChannel socketChannel, + ServerProtocolEngine delegate, + int sendBufferSize, + int receiveBufferSize, + long timeout, + Ticker ticker, + final Set<TransportEncryption> encryptionSet, + final SSLContext sslContext, + final boolean wantClientAuth, + final boolean needClientAuth, + final Collection<String> enabledCipherSuites, + final Collection<String> disabledCipherSuites, + final Runnable onTransportEncryptionAction, + final SelectorThread selectorThread) { - _connection = connection; - _socketChannel = connection.getSocketChannel(); - _protocolEngine = protocolEngine; - _receiveBufSize = receiveBufSize; + _socketChannel = socketChannel; + _timeout = timeout; _ticker = ticker; + _selector = selectorThread; + + _protocolEngine = delegate; + _receiveBufSize = receiveBufferSize; _encryptionSet = encryptionSet; _sslContext = sslContext; _onTransportEncryptionAction = onTransportEncryptionAction; @@ -138,20 +152,112 @@ public class NonBlockingSenderReceiver implements ByteBufferSender throw new SenderException("Unable to prepare the channel for non-blocking IO", e); } + + } + + + public Ticker getTicker() + { + return _ticker; + } + + public SocketChannel getSocketChannel() + { + return _socketChannel; + } + + public void start() + { + } + + public ByteBufferSender getSender() + { + return this; + } + + public void close() + { + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _stateChanged.set(true); + getSelector().wakeup(); + } + } + + public SocketAddress getRemoteAddress() + { + return _socketChannel.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socketChannel.socket().getLocalSocketAddress(); + } + + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; } @Override - public void send(final ByteBuffer msg) + public Principal getPeerPrincipal() { - if (_closed.get()) + synchronized (_lock) { - throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); + if(!_principalChecked) + { + if (_sslEngine != null) + { + try + { + _principal = _sslEngine.getSession().getPeerPrincipal(); + } + catch (SSLPeerUnverifiedException e) + { + return null; + } + } + + _principalChecked = true; + } + + return _principal; } - // append to list and do selector wakeup - _buffers.add(msg); - _stateChanged.set(true); } + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } + + public boolean canRead() + { + return true; + } + + public boolean waitingForWrite() + { + return !_fullyWritten; + } + + public boolean isStateChanged() + { + + return _stateChanged.get(); + } public boolean doWork() { @@ -190,7 +296,12 @@ public class NonBlockingSenderReceiver implements ByteBufferSender catch (IOException e) { LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); - close(); + LOGGER.debug("Closing " + _remoteSocketAddress); + if(_closed.compareAndSet(false,true)) + { + _stateChanged.set(true); + getSelector().wakeup(); + } } } else @@ -241,119 +352,22 @@ public class NonBlockingSenderReceiver implements ByteBufferSender } - @Override - public void flush() + public SelectorThread getSelector() { - _stateChanged.set(true); - _connection.getSelector().wakeup(); - + return _selector; } - @Override - public void close() + public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) { - LOGGER.debug("Closing " + _remoteSocketAddress); - if(_closed.compareAndSet(false,true)) - { - _stateChanged.set(true); - _connection.getSelector().wakeup(); - } - } - - private boolean doWrite() throws IOException - { - - ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; - Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); - for (int i = 0; i < bufArray.length; i++) - { - bufArray[i] = bufferIterator.next(); - } - - int byteBuffersWritten = 0; - - if(_transportEncryption == TransportEncryption.NONE) - { - - - long written = _socketChannel.write(bufArray); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + written + " bytes"); - } - - for (ByteBuffer buf : bufArray) - { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _buffers.poll(); - } - } - - - return bufArray.length == byteBuffersWritten; - } - else if(_transportEncryption == TransportEncryption.TLS) - { - int remaining = 0; - do - { - if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) - { - _workDone = true; - final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); - _status = _sslEngine.wrap(bufArray, netBuffer); - runSSLEngineTasks(_status); - - netBuffer.flip(); - remaining = netBuffer.remaining(); - if (remaining != 0) - { - _encryptedOutput.add(netBuffer); - } - for (ByteBuffer buf : bufArray) - { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _buffers.poll(); - } - } - } - - } - while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); - ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); - long written = _socketChannel.write(encryptedBuffers); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + written + " encrypted bytes"); - } - ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); - while(iter.hasNext()) - { - ByteBuffer buf = iter.next(); - if(buf.remaining() == 0) - { - iter.remove(); - } - else - { - break; - } - } - - return bufArray.length == byteBuffersWritten; - - } - else - { - return true; - } + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); } - private boolean doRead() throws IOException + public boolean doRead() throws IOException { boolean readData = false; if(_transportEncryption == TransportEncryption.NONE) @@ -475,26 +489,100 @@ public class NonBlockingSenderReceiver implements ByteBufferSender return readData; } - private boolean runSSLEngineTasks(final SSLEngineResult status) + public boolean doWrite() throws IOException { - if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); + for (int i = 0; i < bufArray.length; i++) { - Runnable task; - while((task = _sslEngine.getDelegatedTask()) != null) + bufArray[i] = bufferIterator.next(); + } + + int byteBuffersWritten = 0; + + if(_transportEncryption == TransportEncryption.NONE) + { + + + long written = _socketChannel.write(bufArray); + if (LOGGER.isDebugEnabled()) { - task.run(); + LOGGER.debug("Written " + written + " bytes"); } - return true; + + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + + + return bufArray.length == byteBuffersWritten; } - return false; - } + else if(_transportEncryption == TransportEncryption.TLS) + { + int remaining = 0; + do + { + if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) + { + _workDone = true; + final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); + _status = _sslEngine.wrap(bufArray, netBuffer); + runSSLEngineTasks(_status); - private boolean looksLikeSSL(byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + netBuffer.flip(); + remaining = netBuffer.remaining(); + if (remaining != 0) + { + _encryptedOutput.add(netBuffer); + } + for (ByteBuffer buf : bufArray) + { + if (buf.remaining() == 0) + { + byteBuffersWritten++; + _buffers.poll(); + } + } + } + + } + while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); + long written = _socketChannel.write(encryptedBuffers); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " encrypted bytes"); + } + ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); + while(iter.hasNext()) + { + ByteBuffer buf = iter.next(); + if(buf.remaining() == 0) + { + iter.remove(); + } + else + { + break; + } + } + + return bufArray.length == byteBuffersWritten; + + } + else + { + return true; + } } - private boolean looksLikeSSLv3ClientHello(byte[] headerBytes) + public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) { return headerBytes[0] == 22 && // SSL Handshake (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x @@ -505,47 +593,42 @@ public class NonBlockingSenderReceiver implements ByteBufferSender (headerBytes[5] == 1); // client_hello } - private boolean looksLikeSSLv2ClientHello(byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - public Principal getPeerPrincipal() + public boolean runSSLEngineTasks(final SSLEngineResult status) { - - if (_sslEngine != null) + if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { - try - { - return _sslEngine.getSession().getPeerPrincipal(); - } - catch (SSLPeerUnverifiedException e) + Runnable task; + while((task = _sslEngine.getDelegatedTask()) != null) { - return null; + task.run(); } + return true; } - - return null; + return false; } - public boolean canRead() + public boolean looksLikeSSL(final byte[] headerBytes) { - return true; + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); } - public boolean waitingForWrite() + @Override + public void send(final ByteBuffer msg) { - return !_fullyWritten; + if (_closed.get()) + { + throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); + } + // append to list and do selector wakeup + _buffers.add(msg); + _stateChanged.set(true); } - public boolean isStateChanged() + @Override + public void flush() { - return _stateChanged.get(); - } + _stateChanged.set(true); + getSelector().wakeup(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java index 1c49efc294..79313712a5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.transport.network.io; +package org.apache.qpid.server.transport; import java.io.IOException; import java.net.InetSocketAddress; @@ -33,10 +33,12 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.TransportEncryption; +import org.apache.qpid.transport.network.io.AbstractNetworkTransport; +import org.apache.qpid.transport.network.io.IdleTimeoutTicker; public class NonBlockingNetworkTransport { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index bd8d3ad804..786f1915a7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.qpid.transport.network.io; +package org.apache.qpid.server.transport; import java.io.IOException; import java.nio.channels.ClosedChannelException; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index 5f5467db07..7874437a2f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -36,7 +36,6 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.TransportEncryption; -import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport; class TCPandSSLTransport implements AcceptingTransport { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index e670c1f88b..dd43ae7e11 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 5c919252b8..4231045afd 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import java.net.InetSocketAddress; import java.net.SocketAddress; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 75a162deb8..5f227e5f18 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -29,7 +29,7 @@ import javax.security.auth.Subject; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Consumer; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index d9b4495d6e..a2f1f1a4ba 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8fdee7a0f7..d33297fbf6 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -35,7 +35,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java index e11d2ce9bb..a7b08e3f83 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java index b05edc5d04..b9f013d253 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.test.utils.QpidTestCase; public class WindowCreditManagerTest extends QpidTestCase diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 945e18e560..ed075038e6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -58,7 +58,7 @@ import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java index 2d32617106..6e5aab2dd5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java @@ -19,7 +19,7 @@ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class NoAckCreditManager extends AbstractFlowCreditManager diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java index e63645ed09..a869a707e1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; import org.apache.qpid.server.flow.FlowCreditManager; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java index 0058fe86a9..e8cf028069 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java index 7253111114..8817e79aff 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java index e72cc4d058..af37b17d85 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index b6c23dff7a..a442b5c437 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -44,7 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 589bd0ec04..ebd23f31ae 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,7 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java index fa8134cb55..e72dc17b57 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 22cf76a3ea..d361dce682 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -52,7 +52,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Consumer; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java deleted file mode 100644 index df4d7c7721..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.protocol; - -import javax.security.auth.Subject; - -public interface ServerProtocolEngine extends ProtocolEngine -{ - /** - * Gets the connection ID associated with this ProtocolEngine - */ - long getConnectionId(); - - Subject getSubject(); - - boolean isTransportBlockedForWriting(); - - void setTransportBlockedForWriting(boolean blocked); - - void setMessageAssignmentSuspended(boolean value); - - boolean isMessageAssignmentSuspended(); - - void processPendingMessages(); -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java index 54a2a360bb..71704fca3a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -25,7 +25,7 @@ import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportActivity; -class IdleTimeoutTicker implements Ticker +public class IdleTimeoutTicker implements Ticker { private final TransportActivity _transport; private final int _defaultTimeout; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java deleted file mode 100644 index 7c7c6929d1..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java +++ /dev/null @@ -1,187 +0,0 @@ -/* -* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.net.SocketAddress; -import java.nio.channels.SocketChannel; -import java.security.Principal; -import java.util.Collection; -import java.util.Set; - -import javax.net.ssl.SSLContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.network.TransportEncryption; - -public class NonBlockingConnection implements NetworkConnection -{ - private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); - private final SocketChannel _socketChannel; - private final long _timeout; - private final NonBlockingSenderReceiver _nonBlockingSenderReceiver; - private final Ticker _ticker; - private final SelectorThread _selector; - private int _maxReadIdle; - private int _maxWriteIdle; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public NonBlockingConnection(SocketChannel socketChannel, - ServerProtocolEngine delegate, - int sendBufferSize, - int receiveBufferSize, - long timeout, - Ticker ticker, - final Set<TransportEncryption> encryptionSet, - final SSLContext sslContext, - final boolean wantClientAuth, - final boolean needClientAuth, - final Collection<String> enabledCipherSuites, - final Collection<String> disabledCipherSuites, - final Runnable onTransportEncryptionAction, - final SelectorThread selectorThread) - { - _socketChannel = socketChannel; - _timeout = timeout; - _ticker = ticker; - _selector = selectorThread; - - _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(this, - delegate, - receiveBufferSize, - ticker, - encryptionSet, - sslContext, - wantClientAuth, - needClientAuth, - enabledCipherSuites, - disabledCipherSuites, - onTransportEncryptionAction); - - } - - - public Ticker getTicker() - { - return _ticker; - } - - public SocketChannel getSocketChannel() - { - return _socketChannel; - } - - public void start() - { - } - - public ByteBufferSender getSender() - { - return _nonBlockingSenderReceiver; - } - - public void close() - { - _nonBlockingSenderReceiver.close(); - } - - public SocketAddress getRemoteAddress() - { - return _socketChannel.socket().getRemoteSocketAddress(); - } - - public SocketAddress getLocalAddress() - { - return _socketChannel.socket().getLocalSocketAddress(); - } - - public void setMaxWriteIdle(int sec) - { - _maxWriteIdle = sec; - } - - public void setMaxReadIdle(int sec) - { - _maxReadIdle = sec; - } - - @Override - public Principal getPeerPrincipal() - { - synchronized (_lock) - { - if(!_principalChecked) - { - - _principal = _nonBlockingSenderReceiver.getPeerPrincipal(); - - _principalChecked = true; - } - - return _principal; - } - } - - @Override - public int getMaxReadIdle() - { - return _maxReadIdle; - } - - @Override - public int getMaxWriteIdle() - { - return _maxWriteIdle; - } - - public boolean canRead() - { - return _nonBlockingSenderReceiver.canRead(); - } - - public boolean waitingForWrite() - { - return _nonBlockingSenderReceiver.waitingForWrite(); - } - - public boolean isStateChanged() - { - - return _nonBlockingSenderReceiver.isStateChanged(); - } - - public boolean doWork() - { - return _nonBlockingSenderReceiver.doWork(); - } - - public SelectorThread getSelector() - { - return _selector; - } -} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 84eb761899..a1e30ac83e 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -33,7 +33,6 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.Set; -import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; |
