diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-12-09 10:00:24 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-12-09 10:00:24 +0000 |
| commit | 5a53d804f8b548cd4f4829ca8322f76e7f5d8767 (patch) | |
| tree | 56c5d40b8eb9343be80f18217e66d391b9c99b71 | |
| parent | d73645fe7be57f2eae7889f42f6858b13c5573af (diff) | |
| download | qpid-python-5a53d804f8b548cd4f4829ca8322f76e7f5d8767.tar.gz | |
Ensure selector is closed, continue to use same backing buffer on read until it is full
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644014 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java | 32 |
1 files changed, 14 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index df1ffac6b7..3bc5abf27b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -20,8 +20,6 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -56,6 +54,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private final int _receiveBufSize; private final Ticker _ticker; + private ByteBuffer _currentBuffer; public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker) @@ -151,20 +150,13 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } } - try + try(Selector selector = _selector; SocketChannel channel = _socketChannel) { while(!doWrite()) { } - try - { - _receiver.closed(); - } - finally - { - _socketChannel.close(); - } + _receiver.closed(); } catch (IOException e) { @@ -224,19 +216,23 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private void doRead() throws IOException { - ByteBuffer buffer; int remaining; do { - buffer = ByteBuffer.allocate(_receiveBufSize); - _socketChannel.read(buffer); - remaining = buffer.remaining(); + if(_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + _socketChannel.read(_currentBuffer); + remaining = _currentBuffer.remaining(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Read " + buffer.position() + " byte(s)"); + LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)"); } - buffer.flip(); - _receiver.received(buffer); + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _receiver.received(dup); } while (remaining == 0); |
