summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-11 13:08:36 +0000
committerKeith Wall <kwall@apache.org>2014-12-11 13:08:36 +0000
commitd558eca0174f9778af78f2906fef580fc8b8e294 (patch)
treea19a932e7d5d6efa3ed21f1636dd6c73367ec0a2 /qpid/java
parentebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (diff)
downloadqpid-python-d558eca0174f9778af78f2906fef580fc8b8e294.tar.gz
Ensure that the NonBlockingSenderReceiver closes (and thread stops) if the peer closes the connection. Remove some more dead code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java281
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java266
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java272
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java18
8 files changed, 26 insertions, 852 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 9df4ad87e0..6ea9f3600c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -84,11 +84,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_onCloseTask = onCloseTask;
}
- void setTransport(Transport transport)
- {
- _transport = transport;
- }
-
public SocketAddress getRemoteAddress()
{
return _delegate.getRemoteAddress();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
deleted file mode 100644
index 09e8a68fb0..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-import org.apache.qpid.transport.util.Logger;
-
-public class SSLBufferingSender implements Sender<ByteBuffer>
-{
- private static final Logger LOGGER = Logger.get(SSLBufferingSender.class);
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
-
- private final Sender<ByteBuffer> _delegate;
- private final SSLEngine _engine;
- private final int _sslBufSize;
- private final SSLStatus _sslStatus;
-
- private String _hostname;
-
- private final AtomicBoolean _closed = new AtomicBoolean(false);
- private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
-
-
- public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
- {
- _engine = engine;
- _delegate = delegate;
- _sslBufSize = engine.getSession().getPacketBufferSize();
- _sslStatus = sslStatus;
- }
-
- public void setHostname(String hostname)
- {
- _hostname = hostname;
- }
-
- public void close()
- {
- if (!_closed.getAndSet(true))
- {
- if (_engine.isOutboundDone())
- {
- return;
- }
- LOGGER.debug("Closing SSL connection");
- doSend();
- _engine.closeOutbound();
- try
- {
- tearDownSSLConnection();
- }
- catch(Exception e)
- {
- throw new SenderException("Error closing SSL connection",e);
- }
-
-
- synchronized(_sslStatus.getSslLock())
- {
- while (!_engine.isOutboundDone())
- {
- try
- {
- _sslStatus.getSslLock().wait();
- }
- catch(InterruptedException e)
- {
- // pass
- }
-
- }
- }
- _delegate.close();
- }
- }
-
- private void tearDownSSLConnection() throws Exception
- {
- ByteBuffer netData = getNetDataBuffer();
- SSLEngineResult result = _engine.wrap(ByteBuffer.allocate(0), netData);
- Status status = result.getStatus();
- int read = result.bytesProduced();
- while (status != Status.CLOSED)
- {
- if (status == Status.BUFFER_OVERFLOW)
- {
- netData.clear();
- }
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- _delegate.send(data);
- flush();
- }
- result = _engine.wrap(ByteBuffer.allocate(0), netData);
- status = result.getStatus();
- read = result.bytesProduced();
- }
- }
-
- private ByteBuffer getNetDataBuffer()
- {
- return ByteBuffer.allocate(_sslBufSize);
- }
-
- public void flush()
- {
- _delegate.flush();
- }
-
- public void send()
- {
- if(!_closed.get())
- {
- doSend();
- }
- }
-
- public synchronized void send(ByteBuffer appData)
- {
- boolean buffered;
- if(buffered = _appData.hasRemaining())
- {
- ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
- newBuf.put(_appData);
- newBuf.put(appData);
- newBuf.flip();
- _appData = newBuf;
- }
- if (_closed.get())
- {
- throw new SenderException("SSL Sender is closed");
- }
- doSend();
- if(!appData.hasRemaining())
- {
- _appData = EMPTY_BYTE_BUFFER;
- }
- else if(!buffered)
- {
- _appData = ByteBuffer.allocate(appData.remaining());
- _appData.put(appData);
- _appData.flip();
- }
- }
-
- private synchronized void doSend()
- {
-
- HandshakeStatus handshakeStatus;
- Status status;
-
- while((_appData.hasRemaining() || _engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
- && !_sslStatus.getSslErrorFlag())
- {
- ByteBuffer netData = getNetDataBuffer();
-
- int read = 0;
- try
- {
- SSLEngineResult result = _engine.wrap(_appData, netData);
- read = result.bytesProduced();
- status = result.getStatus();
- handshakeStatus = result.getHandshakeStatus();
- }
- catch(SSLException e)
- {
- // Should this set _sslError??
- throw new SenderException("SSL, Error occurred while encrypting data",e);
- }
-
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- _delegate.send(data);
- }
-
- switch(status)
- {
- case CLOSED:
- throw new SenderException("SSLEngine is closed");
-
- case BUFFER_OVERFLOW:
- netData.clear();
- continue;
-
- case OK:
- break; // do nothing
-
- default:
- throw new IllegalStateException("SSLReceiver: Invalid State " + status);
- }
-
- switch (handshakeStatus)
- {
- case NEED_WRAP:
- if (netData.hasRemaining())
- {
- continue;
- }
-
- case NEED_TASK:
- doTasks();
- break;
-
- case NEED_UNWRAP:
- flush();
- return;
-
- case FINISHED:
- if (_hostname != null)
- {
- SSLUtil.verifyHostname(_engine, _hostname);
- }
-
- case NOT_HANDSHAKING:
- break; //do nothing
-
- default:
- throw new IllegalStateException("SSLSender: Invalid State " + status);
- }
-
- }
- }
-
- private void doTasks()
- {
- Runnable runnable;
- while ((runnable = _engine.getDelegatedTask()) != null) {
- runnable.run();
- }
- }
-
- public void setIdleTimeout(int i)
- {
- _delegate.setIdleTimeout(i);
- }
-}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 0224f1b015..4adf472c5d 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -223,11 +223,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
return getRemoteAddress().toString();
}
- public String getAuthId()
- {
- return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
- }
-
public boolean isDurable()
{
return false;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
index c361dc7df6..9d0fe5ddf6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -49,6 +49,7 @@ import org.apache.qpid.transport.network.TransportActivity;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+// TODO we are no longer using the IncomingNetworkTransport
public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index c2635a8dfa..372e1bc5fd 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -49,7 +49,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
@@ -70,24 +70,11 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
public void close()
{
-/*
- if(_connection != null)
- {
- _connection.close();
- }
-*/
if(_acceptor != null)
{
_acceptor.close();
}
}
-/*
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-*/
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
@@ -115,10 +102,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
private final Set<TransportEncryption> _encryptionSet;
private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocketChannel _serverSocket;
+ private final NetworkTransportConfiguration _config;
+ private final ProtocolEngineFactory _factory;
+ private final SSLContext _sslContext;
+ private final ServerSocketChannel _serverSocket;
private int _timeout;
private AcceptingThread(NetworkTransportConfiguration config,
@@ -184,7 +171,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
if(engine != null)
{
socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+ socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -216,12 +203,11 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
}
});
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
ticker.setConnection(connection);
- engine.setNetworkConnection(connection, connection.getSender());
-
connection.start();
}
else
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
deleted file mode 100644
index ccd7170b62..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ssl.SSLSocket;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.Ticker;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.util.SystemUtils;
-
-/**
- * IoReceiver
- *
- */
-
-final class NonBlockingReceiver implements Runnable
-{
-
- private static final Logger log = Logger.get(NonBlockingReceiver.class);
-
- private final Receiver<ByteBuffer> receiver;
- private final int bufferSize;
- private final SocketChannel socket;
- private final long timeout;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Thread receiverThread;
- private static final boolean shutdownBroken;
-
- private Ticker _ticker;
- static
- {
- shutdownBroken = SystemUtils.isWindows();
- }
-
- public NonBlockingReceiver(SocketChannel socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
- {
- this.receiver = receiver;
- this.bufferSize = bufferSize;
- this.socket = socket;
- this.timeout = timeout;
-
- try
- {
- //Create but deliberately don't start the thread.
- receiverThread = Threading.getThreadFactory().createThread(this);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Error creating IOReceiver thread",e);
- }
- receiverThread.setDaemon(true);
- receiverThread.setName(String.format("IoReceiver - %s", socket.socket().getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
- receiverThread.start();
- }
-
- public void close()
- {
- close(false);
- }
-
- void close(boolean block)
- {
- if (!closed.getAndSet(true))
- {
- try
- {
- try
- {
- if (shutdownBroken || socket.socket() instanceof SSLSocket)
- {
- socket.close();
- }
- else
- {
- socket.shutdownInput();
- }
- }
- catch(SocketException se)
- {
- if(!socket.socket().isClosed() && !socket.socket().isInputShutdown())
- {
- throw se;
- }
- }
- if (block && Thread.currentThread() != receiverThread)
- {
- receiverThread.join(timeout);
- if (receiverThread.isAlive())
- {
- throw new TransportException("join timed out");
- }
- }
- }
- catch (InterruptedException e)
- {
- throw new TransportException(e);
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
-
- }
- }
-
- public void run()
- {
- final int threshold = bufferSize / 2;
-
- // I set the read buffer size similar to SO_RCVBUF
- // Haven't tested with a lower value to see if it's better or worse
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- try
- {
- int read = 0;
- long currentTime;
- while(read != -1)
- {
- try
- {
- while ((read = socket.read(buffer)) != -1)
- {
- if (read > 0)
- {
- ByteBuffer b = buffer.duplicate();
- b.flip();
- receiver.received(b);
-
- if (buffer.remaining() < threshold)
- {
- buffer = ByteBuffer.allocate(bufferSize);
- }
- else
- {
- buffer = buffer.slice();
- }
- }
- currentTime = System.currentTimeMillis();
-
- if(_ticker != null)
- {
- int tick = _ticker.getTimeToNextTick(currentTime);
- if(tick <= 0)
- {
- tick = _ticker.tick(currentTime);
- }
- try
- {
- if(!socket.socket().isClosed())
- {
- socket.socket().setSoTimeout(tick <= 0 ? 1 : tick);
- }
- }
- catch(SocketException e)
- {
- // ignore - closed socket
- }
- }
- }
- }
- catch (SocketTimeoutException e)
- {
- currentTime = System.currentTimeMillis();
- if(_ticker != null)
- {
- final int tick = _ticker.tick(currentTime);
- if(!socket.socket().isClosed())
- {
- try
- {
- socket.socket().setSoTimeout(tick <= 0 ? 1 : tick );
- }
- catch(SocketException ex)
- {
- // ignore - closed socket
- }
- }
- }
- }
- }
- }
- catch (Exception t)
- {
- if (shouldReport(t))
- {
- receiver.exception(t);
- }
- }
- finally
- {
- receiver.closed();
- try
- {
- socket.close();
- }
- catch(Exception e)
- {
- log.warn(e, "Error closing socket");
- }
- }
- }
-
- private boolean shouldReport(Throwable t)
- {
- boolean brokenClose = closed.get() &&
- shutdownBroken &&
- t instanceof SocketException &&
- "socket closed".equalsIgnoreCase(t.getMessage());
-
- boolean sslSocketClosed = closed.get() &&
- socket.socket() instanceof SSLSocket &&
- t instanceof SocketException &&
- "Socket is closed".equalsIgnoreCase(t.getMessage());
-
- boolean recvFailed = closed.get() &&
- shutdownBroken &&
- t instanceof SocketException &&
- "Socket operation on nonsocket: recv failed".equalsIgnoreCase(t.getMessage());
-
- return !brokenClose && !sslSocketClosed && !recvFailed;
- }
-
- public Ticker getTicker()
- {
- return _ticker;
- }
-
- public void setTicker(Ticker ticker)
- {
- _ticker = ticker;
- }
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
deleted file mode 100644
index e3d604d2ba..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
-
-
-public final class NonBlockingSender implements Runnable, Sender<ByteBuffer>
-{
-
- private static final Logger log = Logger.get(NonBlockingSender.class);
-
- // by starting here, we ensure that we always test the wraparound
- // case, we should probably make this configurable somehow so that
- // we can test other cases as well
- private final static int START = Integer.MAX_VALUE - 10;
-
- private final long timeout;
- private final SocketChannel socket;
-
- private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
- private final Object _isEmpty = new Object();
-
- private volatile boolean idle = true;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Thread senderThread;
- private NonBlockingReceiver _receiver;
- private final String _remoteSocketAddress;
-
- private volatile Throwable exception = null;
-
- public NonBlockingSender(SocketChannel socket, int bufferSize, long timeout)
- {
- this.socket = socket;
- this.timeout = timeout;
- _remoteSocketAddress = socket.socket().getRemoteSocketAddress().toString();
-
-
- try
- {
- //Create but deliberately don't start the thread.
- senderThread = Threading.getThreadFactory().createThread(this);
- }
- catch(Exception e)
- {
- throw new Error("Error creating IOSender thread",e);
- }
-
- senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
- }
-
- public void initiate()
- {
- senderThread.start();
- }
-
-
- public void send(ByteBuffer buf)
- {
- checkNotAlreadyClosed();
-
- if(!senderThread.isAlive())
- {
- throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress));
- }
-
-
- _buffers.add(buf);
- synchronized (_isEmpty)
- {
- _isEmpty.notifyAll();
- }
-
- }
-
- public void flush()
- {
- if (idle)
- {
- synchronized (_isEmpty)
- {
- _isEmpty.notify();
- }
- }
- }
-
- public void close()
- {
- close(true, true);
- }
-
- private void close(boolean awaitSenderBeforeClose, boolean reportException)
- {
- if (!closed.getAndSet(true))
- {
-
- synchronized (_isEmpty)
- {
- _isEmpty.notify();
- }
-
- try
- {
- if (awaitSenderBeforeClose)
- {
- awaitSenderThreadShutdown();
- }
- }
- finally
- {
- closeReceiver();
- }
- if (reportException && exception != null)
- {
- throw new SenderException(exception);
- }
- }
- }
-
- private void closeReceiver()
- {
- if(_receiver != null)
- {
- try
- {
- _receiver.close();
- }
- catch(RuntimeException e)
- {
- log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress);
- throw new SenderException(e.getMessage(), e);
- }
- }
- }
-
- public void run()
- {
- while (true)
- {
-
- if (closed.get())
- {
- break;
- }
-
- idle = true;
-
- synchronized (_isEmpty)
- {
- while (_buffers.isEmpty() && !closed.get())
- {
- try
- {
- _isEmpty.wait();
- }
- catch (InterruptedException e)
- {
- // pass
- }
- }
- }
-
- idle = false;
-
- ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
- Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
- for(int i = 0; i < bufArray.length; i++)
- {
- bufArray[i] = bufferIterator.next();
- }
-
- try
- {
- socket.write(bufArray);
- for(ByteBuffer buf : bufArray)
- {
- if(buf.remaining() == 0)
- {
- _buffers.poll();
- }
- else
- {
- break;
- }
- }
- }
- catch (IOException e)
- {
- log.info("Exception in thread sending to '" + _remoteSocketAddress + "': " + e);
- exception = e;
- close(false, false);
- break;
- }
-
- }
-
- }
-
- public void setIdleTimeout(int i)
- {
- try
- {
- socket.socket().setSoTimeout(i);
- }
- catch (Exception e)
- {
- throw new SenderException(e);
- }
- }
-
- public void setReceiver(NonBlockingReceiver receiver)
- {
- _receiver = receiver;
- }
-
- private void awaitSenderThreadShutdown()
- {
- if (Thread.currentThread() != senderThread)
- {
- try
- {
- senderThread.join(timeout);
- if (senderThread.isAlive())
- {
- log.error("join timed out for socket %s to stop", _remoteSocketAddress);
- throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress));
- }
- }
- catch (InterruptedException e)
- {
- log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress);
- throw new SenderException(e);
- }
- }
- }
-
- private void checkNotAlreadyClosed()
- {
- if (closed.get())
- {
- throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception);
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index dfc2697c79..1f6d50e68f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -178,6 +178,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
// read as much as you can
// try to write all pending byte buffers
+
while (!_closed.get())
{
@@ -206,6 +207,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
fullyWritten
? SelectionKey.OP_READ
: (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+
}
catch (IOException e)
{
@@ -359,7 +361,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
_currentBuffer = ByteBuffer.allocate(_receiveBufSize);
}
- _socketChannel.read(_currentBuffer);
+ int read = _socketChannel.read(_currentBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
remaining = _currentBuffer.remaining();
if (LOGGER.isDebugEnabled())
{
@@ -378,6 +384,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
{
read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
_netInputBuffer.flip();
ByteBuffer appInputBuffer =
@@ -403,6 +414,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
if (_netInputBuffer.position() >= 6)