summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-11 13:08:36 +0000
committerKeith Wall <kwall@apache.org>2014-12-11 13:08:36 +0000
commitd558eca0174f9778af78f2906fef580fc8b8e294 (patch)
treea19a932e7d5d6efa3ed21f1636dd6c73367ec0a2 /qpid/java/common/src
parentebc8dd1be3244a6c6bc24f74dcf827abd5e0ceaa (diff)
downloadqpid-python-d558eca0174f9778af78f2906fef580fc8b8e294.tar.gz
Ensure that the NonBlockingSenderReceiver closes (and thread stops) if the peer closes the connection. Remove some more dead code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java266
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java272
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java18
5 files changed, 26 insertions, 561 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
index c361dc7df6..9d0fe5ddf6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -49,6 +49,7 @@ import org.apache.qpid.transport.network.TransportActivity;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+// TODO we are no longer using the IncomingNetworkTransport
public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index c2635a8dfa..372e1bc5fd 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -49,7 +49,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private AcceptingThread _acceptor;
@@ -70,24 +70,11 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
public void close()
{
-/*
- if(_connection != null)
- {
- _connection.close();
- }
-*/
if(_acceptor != null)
{
_acceptor.close();
}
}
-/*
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-*/
public void accept(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
@@ -115,10 +102,10 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
{
private final Set<TransportEncryption> _encryptionSet;
private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocketChannel _serverSocket;
+ private final NetworkTransportConfiguration _config;
+ private final ProtocolEngineFactory _factory;
+ private final SSLContext _sslContext;
+ private final ServerSocketChannel _serverSocket;
private int _timeout;
private AcceptingThread(NetworkTransportConfiguration config,
@@ -184,7 +171,7 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
if(engine != null)
{
socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+ socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -216,12 +203,11 @@ public class NonBlockingNetworkTransport implements IncomingNetworkTransport
}
});
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
ticker.setConnection(connection);
- engine.setNetworkConnection(connection, connection.getSender());
-
connection.start();
}
else
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
deleted file mode 100644
index ccd7170b62..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ssl.SSLSocket;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.Ticker;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.util.SystemUtils;
-
-/**
- * IoReceiver
- *
- */
-
-final class NonBlockingReceiver implements Runnable
-{
-
- private static final Logger log = Logger.get(NonBlockingReceiver.class);
-
- private final Receiver<ByteBuffer> receiver;
- private final int bufferSize;
- private final SocketChannel socket;
- private final long timeout;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Thread receiverThread;
- private static final boolean shutdownBroken;
-
- private Ticker _ticker;
- static
- {
- shutdownBroken = SystemUtils.isWindows();
- }
-
- public NonBlockingReceiver(SocketChannel socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
- {
- this.receiver = receiver;
- this.bufferSize = bufferSize;
- this.socket = socket;
- this.timeout = timeout;
-
- try
- {
- //Create but deliberately don't start the thread.
- receiverThread = Threading.getThreadFactory().createThread(this);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Error creating IOReceiver thread",e);
- }
- receiverThread.setDaemon(true);
- receiverThread.setName(String.format("IoReceiver - %s", socket.socket().getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
- receiverThread.start();
- }
-
- public void close()
- {
- close(false);
- }
-
- void close(boolean block)
- {
- if (!closed.getAndSet(true))
- {
- try
- {
- try
- {
- if (shutdownBroken || socket.socket() instanceof SSLSocket)
- {
- socket.close();
- }
- else
- {
- socket.shutdownInput();
- }
- }
- catch(SocketException se)
- {
- if(!socket.socket().isClosed() && !socket.socket().isInputShutdown())
- {
- throw se;
- }
- }
- if (block && Thread.currentThread() != receiverThread)
- {
- receiverThread.join(timeout);
- if (receiverThread.isAlive())
- {
- throw new TransportException("join timed out");
- }
- }
- }
- catch (InterruptedException e)
- {
- throw new TransportException(e);
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
-
- }
- }
-
- public void run()
- {
- final int threshold = bufferSize / 2;
-
- // I set the read buffer size similar to SO_RCVBUF
- // Haven't tested with a lower value to see if it's better or worse
- ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
- try
- {
- int read = 0;
- long currentTime;
- while(read != -1)
- {
- try
- {
- while ((read = socket.read(buffer)) != -1)
- {
- if (read > 0)
- {
- ByteBuffer b = buffer.duplicate();
- b.flip();
- receiver.received(b);
-
- if (buffer.remaining() < threshold)
- {
- buffer = ByteBuffer.allocate(bufferSize);
- }
- else
- {
- buffer = buffer.slice();
- }
- }
- currentTime = System.currentTimeMillis();
-
- if(_ticker != null)
- {
- int tick = _ticker.getTimeToNextTick(currentTime);
- if(tick <= 0)
- {
- tick = _ticker.tick(currentTime);
- }
- try
- {
- if(!socket.socket().isClosed())
- {
- socket.socket().setSoTimeout(tick <= 0 ? 1 : tick);
- }
- }
- catch(SocketException e)
- {
- // ignore - closed socket
- }
- }
- }
- }
- catch (SocketTimeoutException e)
- {
- currentTime = System.currentTimeMillis();
- if(_ticker != null)
- {
- final int tick = _ticker.tick(currentTime);
- if(!socket.socket().isClosed())
- {
- try
- {
- socket.socket().setSoTimeout(tick <= 0 ? 1 : tick );
- }
- catch(SocketException ex)
- {
- // ignore - closed socket
- }
- }
- }
- }
- }
- }
- catch (Exception t)
- {
- if (shouldReport(t))
- {
- receiver.exception(t);
- }
- }
- finally
- {
- receiver.closed();
- try
- {
- socket.close();
- }
- catch(Exception e)
- {
- log.warn(e, "Error closing socket");
- }
- }
- }
-
- private boolean shouldReport(Throwable t)
- {
- boolean brokenClose = closed.get() &&
- shutdownBroken &&
- t instanceof SocketException &&
- "socket closed".equalsIgnoreCase(t.getMessage());
-
- boolean sslSocketClosed = closed.get() &&
- socket.socket() instanceof SSLSocket &&
- t instanceof SocketException &&
- "Socket is closed".equalsIgnoreCase(t.getMessage());
-
- boolean recvFailed = closed.get() &&
- shutdownBroken &&
- t instanceof SocketException &&
- "Socket operation on nonsocket: recv failed".equalsIgnoreCase(t.getMessage());
-
- return !brokenClose && !sslSocketClosed && !recvFailed;
- }
-
- public Ticker getTicker()
- {
- return _ticker;
- }
-
- public void setTicker(Ticker ticker)
- {
- _ticker = ticker;
- }
-
-
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
deleted file mode 100644
index e3d604d2ba..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.transport.network.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.util.Logger;
-
-
-public final class NonBlockingSender implements Runnable, Sender<ByteBuffer>
-{
-
- private static final Logger log = Logger.get(NonBlockingSender.class);
-
- // by starting here, we ensure that we always test the wraparound
- // case, we should probably make this configurable somehow so that
- // we can test other cases as well
- private final static int START = Integer.MAX_VALUE - 10;
-
- private final long timeout;
- private final SocketChannel socket;
-
- private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
- private final Object _isEmpty = new Object();
-
- private volatile boolean idle = true;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Thread senderThread;
- private NonBlockingReceiver _receiver;
- private final String _remoteSocketAddress;
-
- private volatile Throwable exception = null;
-
- public NonBlockingSender(SocketChannel socket, int bufferSize, long timeout)
- {
- this.socket = socket;
- this.timeout = timeout;
- _remoteSocketAddress = socket.socket().getRemoteSocketAddress().toString();
-
-
- try
- {
- //Create but deliberately don't start the thread.
- senderThread = Threading.getThreadFactory().createThread(this);
- }
- catch(Exception e)
- {
- throw new Error("Error creating IOSender thread",e);
- }
-
- senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
- }
-
- public void initiate()
- {
- senderThread.start();
- }
-
-
- public void send(ByteBuffer buf)
- {
- checkNotAlreadyClosed();
-
- if(!senderThread.isAlive())
- {
- throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress));
- }
-
-
- _buffers.add(buf);
- synchronized (_isEmpty)
- {
- _isEmpty.notifyAll();
- }
-
- }
-
- public void flush()
- {
- if (idle)
- {
- synchronized (_isEmpty)
- {
- _isEmpty.notify();
- }
- }
- }
-
- public void close()
- {
- close(true, true);
- }
-
- private void close(boolean awaitSenderBeforeClose, boolean reportException)
- {
- if (!closed.getAndSet(true))
- {
-
- synchronized (_isEmpty)
- {
- _isEmpty.notify();
- }
-
- try
- {
- if (awaitSenderBeforeClose)
- {
- awaitSenderThreadShutdown();
- }
- }
- finally
- {
- closeReceiver();
- }
- if (reportException && exception != null)
- {
- throw new SenderException(exception);
- }
- }
- }
-
- private void closeReceiver()
- {
- if(_receiver != null)
- {
- try
- {
- _receiver.close();
- }
- catch(RuntimeException e)
- {
- log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress);
- throw new SenderException(e.getMessage(), e);
- }
- }
- }
-
- public void run()
- {
- while (true)
- {
-
- if (closed.get())
- {
- break;
- }
-
- idle = true;
-
- synchronized (_isEmpty)
- {
- while (_buffers.isEmpty() && !closed.get())
- {
- try
- {
- _isEmpty.wait();
- }
- catch (InterruptedException e)
- {
- // pass
- }
- }
- }
-
- idle = false;
-
- ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
- Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
- for(int i = 0; i < bufArray.length; i++)
- {
- bufArray[i] = bufferIterator.next();
- }
-
- try
- {
- socket.write(bufArray);
- for(ByteBuffer buf : bufArray)
- {
- if(buf.remaining() == 0)
- {
- _buffers.poll();
- }
- else
- {
- break;
- }
- }
- }
- catch (IOException e)
- {
- log.info("Exception in thread sending to '" + _remoteSocketAddress + "': " + e);
- exception = e;
- close(false, false);
- break;
- }
-
- }
-
- }
-
- public void setIdleTimeout(int i)
- {
- try
- {
- socket.socket().setSoTimeout(i);
- }
- catch (Exception e)
- {
- throw new SenderException(e);
- }
- }
-
- public void setReceiver(NonBlockingReceiver receiver)
- {
- _receiver = receiver;
- }
-
- private void awaitSenderThreadShutdown()
- {
- if (Thread.currentThread() != senderThread)
- {
- try
- {
- senderThread.join(timeout);
- if (senderThread.isAlive())
- {
- log.error("join timed out for socket %s to stop", _remoteSocketAddress);
- throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress));
- }
- }
- catch (InterruptedException e)
- {
- log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress);
- throw new SenderException(e);
- }
- }
- }
-
- private void checkNotAlreadyClosed()
- {
- if (closed.get())
- {
- throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception);
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index dfc2697c79..1f6d50e68f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -178,6 +178,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
// read as much as you can
// try to write all pending byte buffers
+
while (!_closed.get())
{
@@ -206,6 +207,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
fullyWritten
? SelectionKey.OP_READ
: (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+
}
catch (IOException e)
{
@@ -359,7 +361,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
_currentBuffer = ByteBuffer.allocate(_receiveBufSize);
}
- _socketChannel.read(_currentBuffer);
+ int read = _socketChannel.read(_currentBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
remaining = _currentBuffer.remaining();
if (LOGGER.isDebugEnabled())
{
@@ -378,6 +384,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
{
read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
_netInputBuffer.flip();
ByteBuffer appInputBuffer =
@@ -403,6 +414,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
read = _socketChannel.read(_netInputBuffer);
+ if (read == -1)
+ {
+ _closed.set(true);
+ }
+
LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
if (_netInputBuffer.position() >= 6)