summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-12-09 10:00:24 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-12-09 10:00:24 +0000
commit5a53d804f8b548cd4f4829ca8322f76e7f5d8767 (patch)
tree56c5d40b8eb9343be80f18217e66d391b9c99b71
parentd73645fe7be57f2eae7889f42f6858b13c5573af (diff)
downloadqpid-python-5a53d804f8b548cd4f4829ca8322f76e7f5d8767.tar.gz
Ensure selector is closed, continue to use same backing buffer on read until it is full
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644014 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java32
1 files changed, 14 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index df1ffac6b7..3bc5abf27b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -20,8 +20,6 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -56,6 +54,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private final int _receiveBufSize;
private final Ticker _ticker;
+ private ByteBuffer _currentBuffer;
public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker)
@@ -151,20 +150,13 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
}
- try
+ try(Selector selector = _selector; SocketChannel channel = _socketChannel)
{
while(!doWrite())
{
}
- try
- {
- _receiver.closed();
- }
- finally
- {
- _socketChannel.close();
- }
+ _receiver.closed();
}
catch (IOException e)
{
@@ -224,19 +216,23 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
private void doRead() throws IOException
{
- ByteBuffer buffer;
int remaining;
do
{
- buffer = ByteBuffer.allocate(_receiveBufSize);
- _socketChannel.read(buffer);
- remaining = buffer.remaining();
+ if(_currentBuffer == null || _currentBuffer.remaining() == 0)
+ {
+ _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+ }
+ _socketChannel.read(_currentBuffer);
+ remaining = _currentBuffer.remaining();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Read " + buffer.position() + " byte(s)");
+ LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)");
}
- buffer.flip();
- _receiver.received(buffer);
+ ByteBuffer dup = _currentBuffer.duplicate();
+ dup.flip();
+ _currentBuffer = _currentBuffer.slice();
+ _receiver.received(dup);
}
while (remaining == 0);