diff options
| author | Keith Wall <kwall@apache.org> | 2014-12-11 13:08:36 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-12-11 13:08:36 +0000 |
| commit | d558eca0174f9778af78f2906fef580fc8b8e294 (patch) | |
| tree | a19a932e7d5d6efa3ed21f1636dd6c73367ec0a2 /qpid/java | |
| parent | ebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (diff) | |
| download | qpid-python-d558eca0174f9778af78f2906fef580fc8b8e294.tar.gz | |
Ensure that the NonBlockingSenderReceiver closes (and thread stops) if the peer closes the connection. Remove some more dead code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 26 insertions, 852 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 9df4ad87e0..6ea9f3600c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -84,11 +84,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _onCloseTask = onCloseTask; } - void setTransport(Transport transport) - { - _transport = transport; - } - public SocketAddress getRemoteAddress() { return _delegate.getRemoteAddress(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java deleted file mode 100644 index 09e8a68fb0..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; -import javax.net.ssl.SSLEngineResult.Status; -import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; -import org.apache.qpid.transport.util.Logger; - -public class SSLBufferingSender implements Sender<ByteBuffer> -{ - private static final Logger LOGGER = Logger.get(SSLBufferingSender.class); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - - private final Sender<ByteBuffer> _delegate; - private final SSLEngine _engine; - private final int _sslBufSize; - private final SSLStatus _sslStatus; - - private String _hostname; - - private final AtomicBoolean _closed = new AtomicBoolean(false); - private ByteBuffer _appData = EMPTY_BYTE_BUFFER; - - - public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) - { - _engine = engine; - _delegate = delegate; - _sslBufSize = engine.getSession().getPacketBufferSize(); - _sslStatus = sslStatus; - } - - public void setHostname(String hostname) - { - _hostname = hostname; - } - - public void close() - { - if (!_closed.getAndSet(true)) - { - if (_engine.isOutboundDone()) - { - return; - } - LOGGER.debug("Closing SSL connection"); - doSend(); - _engine.closeOutbound(); - try - { - tearDownSSLConnection(); - } - catch(Exception e) - { - throw new SenderException("Error closing SSL connection",e); - } - - - synchronized(_sslStatus.getSslLock()) - { - while (!_engine.isOutboundDone()) - { - try - { - _sslStatus.getSslLock().wait(); - } - catch(InterruptedException e) - { - // pass - } - - } - } - _delegate.close(); - } - } - - private void tearDownSSLConnection() throws Exception - { - ByteBuffer netData = getNetDataBuffer(); - SSLEngineResult result = _engine.wrap(ByteBuffer.allocate(0), netData); - Status status = result.getStatus(); - int read = result.bytesProduced(); - while (status != Status.CLOSED) - { - if (status == Status.BUFFER_OVERFLOW) - { - netData.clear(); - } - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - _delegate.send(data); - flush(); - } - result = _engine.wrap(ByteBuffer.allocate(0), netData); - status = result.getStatus(); - read = result.bytesProduced(); - } - } - - private ByteBuffer getNetDataBuffer() - { - return ByteBuffer.allocate(_sslBufSize); - } - - public void flush() - { - _delegate.flush(); - } - - public void send() - { - if(!_closed.get()) - { - doSend(); - } - } - - public synchronized void send(ByteBuffer appData) - { - boolean buffered; - if(buffered = _appData.hasRemaining()) - { - ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining()); - newBuf.put(_appData); - newBuf.put(appData); - newBuf.flip(); - _appData = newBuf; - } - if (_closed.get()) - { - throw new SenderException("SSL Sender is closed"); - } - doSend(); - if(!appData.hasRemaining()) - { - _appData = EMPTY_BYTE_BUFFER; - } - else if(!buffered) - { - _appData = ByteBuffer.allocate(appData.remaining()); - _appData.put(appData); - _appData.flip(); - } - } - - private synchronized void doSend() - { - - HandshakeStatus handshakeStatus; - Status status; - - while((_appData.hasRemaining() || _engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) - && !_sslStatus.getSslErrorFlag()) - { - ByteBuffer netData = getNetDataBuffer(); - - int read = 0; - try - { - SSLEngineResult result = _engine.wrap(_appData, netData); - read = result.bytesProduced(); - status = result.getStatus(); - handshakeStatus = result.getHandshakeStatus(); - } - catch(SSLException e) - { - // Should this set _sslError?? - throw new SenderException("SSL, Error occurred while encrypting data",e); - } - - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - _delegate.send(data); - } - - switch(status) - { - case CLOSED: - throw new SenderException("SSLEngine is closed"); - - case BUFFER_OVERFLOW: - netData.clear(); - continue; - - case OK: - break; // do nothing - - default: - throw new IllegalStateException("SSLReceiver: Invalid State " + status); - } - - switch (handshakeStatus) - { - case NEED_WRAP: - if (netData.hasRemaining()) - { - continue; - } - - case NEED_TASK: - doTasks(); - break; - - case NEED_UNWRAP: - flush(); - return; - - case FINISHED: - if (_hostname != null) - { - SSLUtil.verifyHostname(_engine, _hostname); - } - - case NOT_HANDSHAKING: - break; //do nothing - - default: - throw new IllegalStateException("SSLSender: Invalid State " + status); - } - - } - } - - private void doTasks() - { - Runnable runnable; - while ((runnable = _engine.getDelegatedTask()) != null) { - runnable.run(); - } - } - - public void setIdleTimeout(int i) - { - _delegate.setIdleTimeout(i); - } -} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 0224f1b015..4adf472c5d 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -223,11 +223,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return getRemoteAddress().toString(); } - public String getAuthId() - { - return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); - } - public boolean isDurable() { return false; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java index c361dc7df6..9d0fe5ddf6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java @@ -49,6 +49,7 @@ import org.apache.qpid.transport.network.TransportActivity; import org.apache.qpid.transport.network.TransportEncryption; import org.apache.qpid.transport.network.security.ssl.SSLUtil; +// TODO we are no longer using the IncomingNetworkTransport public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index c2635a8dfa..372e1bc5fd 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -49,7 +49,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); - private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); private AcceptingThread _acceptor; @@ -70,24 +70,11 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport public void close() { -/* - if(_connection != null) - { - _connection.close(); - } -*/ if(_acceptor != null) { _acceptor.close(); } } -/* - - public NetworkConnection getConnection() - { - return _connection; - } -*/ public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, @@ -115,10 +102,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport { private final Set<TransportEncryption> _encryptionSet; private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocketChannel _serverSocket; + private final NetworkTransportConfiguration _config; + private final ProtocolEngineFactory _factory; + private final SSLContext _sslContext; + private final ServerSocketChannel _serverSocket; private int _timeout; private AcceptingThread(NetworkTransportConfiguration config, @@ -184,7 +171,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport if(engine != null) { socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT); + socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); @@ -216,12 +203,11 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport } }); - connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); ticker.setConnection(connection); - engine.setNetworkConnection(connection, connection.getSender()); - connection.start(); } else diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java deleted file mode 100644 index ccd7170b62..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.ssl.SSLSocket; - -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.util.SystemUtils; - -/** - * IoReceiver - * - */ - -final class NonBlockingReceiver implements Runnable -{ - - private static final Logger log = Logger.get(NonBlockingReceiver.class); - - private final Receiver<ByteBuffer> receiver; - private final int bufferSize; - private final SocketChannel socket; - private final long timeout; - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread receiverThread; - private static final boolean shutdownBroken; - - private Ticker _ticker; - static - { - shutdownBroken = SystemUtils.isWindows(); - } - - public NonBlockingReceiver(SocketChannel socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) - { - this.receiver = receiver; - this.bufferSize = bufferSize; - this.socket = socket; - this.timeout = timeout; - - try - { - //Create but deliberately don't start the thread. - receiverThread = Threading.getThreadFactory().createThread(this); - } - catch(Exception e) - { - throw new RuntimeException("Error creating IOReceiver thread",e); - } - receiverThread.setDaemon(true); - receiverThread.setName(String.format("IoReceiver - %s", socket.socket().getRemoteSocketAddress())); - } - - public void initiate() - { - receiverThread.start(); - } - - public void close() - { - close(false); - } - - void close(boolean block) - { - if (!closed.getAndSet(true)) - { - try - { - try - { - if (shutdownBroken || socket.socket() instanceof SSLSocket) - { - socket.close(); - } - else - { - socket.shutdownInput(); - } - } - catch(SocketException se) - { - if(!socket.socket().isClosed() && !socket.socket().isInputShutdown()) - { - throw se; - } - } - if (block && Thread.currentThread() != receiverThread) - { - receiverThread.join(timeout); - if (receiverThread.isAlive()) - { - throw new TransportException("join timed out"); - } - } - } - catch (InterruptedException e) - { - throw new TransportException(e); - } - catch (IOException e) - { - throw new TransportException(e); - } - - } - } - - public void run() - { - final int threshold = bufferSize / 2; - - // I set the read buffer size similar to SO_RCVBUF - // Haven't tested with a lower value to see if it's better or worse - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - try - { - int read = 0; - long currentTime; - while(read != -1) - { - try - { - while ((read = socket.read(buffer)) != -1) - { - if (read > 0) - { - ByteBuffer b = buffer.duplicate(); - b.flip(); - receiver.received(b); - - if (buffer.remaining() < threshold) - { - buffer = ByteBuffer.allocate(bufferSize); - } - else - { - buffer = buffer.slice(); - } - } - currentTime = System.currentTimeMillis(); - - if(_ticker != null) - { - int tick = _ticker.getTimeToNextTick(currentTime); - if(tick <= 0) - { - tick = _ticker.tick(currentTime); - } - try - { - if(!socket.socket().isClosed()) - { - socket.socket().setSoTimeout(tick <= 0 ? 1 : tick); - } - } - catch(SocketException e) - { - // ignore - closed socket - } - } - } - } - catch (SocketTimeoutException e) - { - currentTime = System.currentTimeMillis(); - if(_ticker != null) - { - final int tick = _ticker.tick(currentTime); - if(!socket.socket().isClosed()) - { - try - { - socket.socket().setSoTimeout(tick <= 0 ? 1 : tick ); - } - catch(SocketException ex) - { - // ignore - closed socket - } - } - } - } - } - } - catch (Exception t) - { - if (shouldReport(t)) - { - receiver.exception(t); - } - } - finally - { - receiver.closed(); - try - { - socket.close(); - } - catch(Exception e) - { - log.warn(e, "Error closing socket"); - } - } - } - - private boolean shouldReport(Throwable t) - { - boolean brokenClose = closed.get() && - shutdownBroken && - t instanceof SocketException && - "socket closed".equalsIgnoreCase(t.getMessage()); - - boolean sslSocketClosed = closed.get() && - socket.socket() instanceof SSLSocket && - t instanceof SocketException && - "Socket is closed".equalsIgnoreCase(t.getMessage()); - - boolean recvFailed = closed.get() && - shutdownBroken && - t instanceof SocketException && - "Socket operation on nonsocket: recv failed".equalsIgnoreCase(t.getMessage()); - - return !brokenClose && !sslSocketClosed && !recvFailed; - } - - public Ticker getTicker() - { - return _ticker; - } - - public void setTicker(Ticker ticker) - { - _ticker = ticker; - } - - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java deleted file mode 100644 index e3d604d2ba..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.transport.network.io; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderClosedException; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.util.Logger; - - -public final class NonBlockingSender implements Runnable, Sender<ByteBuffer> -{ - - private static final Logger log = Logger.get(NonBlockingSender.class); - - // by starting here, we ensure that we always test the wraparound - // case, we should probably make this configurable somehow so that - // we can test other cases as well - private final static int START = Integer.MAX_VALUE - 10; - - private final long timeout; - private final SocketChannel socket; - - private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); - private final Object _isEmpty = new Object(); - - private volatile boolean idle = true; - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread senderThread; - private NonBlockingReceiver _receiver; - private final String _remoteSocketAddress; - - private volatile Throwable exception = null; - - public NonBlockingSender(SocketChannel socket, int bufferSize, long timeout) - { - this.socket = socket; - this.timeout = timeout; - _remoteSocketAddress = socket.socket().getRemoteSocketAddress().toString(); - - - try - { - //Create but deliberately don't start the thread. - senderThread = Threading.getThreadFactory().createThread(this); - } - catch(Exception e) - { - throw new Error("Error creating IOSender thread",e); - } - - senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); - } - - public void initiate() - { - senderThread.start(); - } - - - public void send(ByteBuffer buf) - { - checkNotAlreadyClosed(); - - if(!senderThread.isAlive()) - { - throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress)); - } - - - _buffers.add(buf); - synchronized (_isEmpty) - { - _isEmpty.notifyAll(); - } - - } - - public void flush() - { - if (idle) - { - synchronized (_isEmpty) - { - _isEmpty.notify(); - } - } - } - - public void close() - { - close(true, true); - } - - private void close(boolean awaitSenderBeforeClose, boolean reportException) - { - if (!closed.getAndSet(true)) - { - - synchronized (_isEmpty) - { - _isEmpty.notify(); - } - - try - { - if (awaitSenderBeforeClose) - { - awaitSenderThreadShutdown(); - } - } - finally - { - closeReceiver(); - } - if (reportException && exception != null) - { - throw new SenderException(exception); - } - } - } - - private void closeReceiver() - { - if(_receiver != null) - { - try - { - _receiver.close(); - } - catch(RuntimeException e) - { - log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress); - throw new SenderException(e.getMessage(), e); - } - } - } - - public void run() - { - while (true) - { - - if (closed.get()) - { - break; - } - - idle = true; - - synchronized (_isEmpty) - { - while (_buffers.isEmpty() && !closed.get()) - { - try - { - _isEmpty.wait(); - } - catch (InterruptedException e) - { - // pass - } - } - } - - idle = false; - - ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; - Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); - for(int i = 0; i < bufArray.length; i++) - { - bufArray[i] = bufferIterator.next(); - } - - try - { - socket.write(bufArray); - for(ByteBuffer buf : bufArray) - { - if(buf.remaining() == 0) - { - _buffers.poll(); - } - else - { - break; - } - } - } - catch (IOException e) - { - log.info("Exception in thread sending to '" + _remoteSocketAddress + "': " + e); - exception = e; - close(false, false); - break; - } - - } - - } - - public void setIdleTimeout(int i) - { - try - { - socket.socket().setSoTimeout(i); - } - catch (Exception e) - { - throw new SenderException(e); - } - } - - public void setReceiver(NonBlockingReceiver receiver) - { - _receiver = receiver; - } - - private void awaitSenderThreadShutdown() - { - if (Thread.currentThread() != senderThread) - { - try - { - senderThread.join(timeout); - if (senderThread.isAlive()) - { - log.error("join timed out for socket %s to stop", _remoteSocketAddress); - throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress)); - } - } - catch (InterruptedException e) - { - log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress); - throw new SenderException(e); - } - } - } - - private void checkNotAlreadyClosed() - { - if (closed.get()) - { - throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception); - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index dfc2697c79..1f6d50e68f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -178,6 +178,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> // read as much as you can // try to write all pending byte buffers + while (!_closed.get()) { @@ -206,6 +207,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + } catch (IOException e) { @@ -359,7 +361,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> { _currentBuffer = ByteBuffer.allocate(_receiveBufSize); } - _socketChannel.read(_currentBuffer); + int read = _socketChannel.read(_currentBuffer); + if (read == -1) + { + _closed.set(true); + } remaining = _currentBuffer.remaining(); if (LOGGER.isDebugEnabled()) { @@ -378,6 +384,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) { read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer); _netInputBuffer.flip(); ByteBuffer appInputBuffer = @@ -403,6 +414,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> { read = _socketChannel.read(_netInputBuffer); + if (read == -1) + { + _closed.set(true); + } + LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); if (_netInputBuffer.position() >= 6) |
