diff options
Diffstat (limited to 'qpid/java/common/src')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java | 32 |
1 files changed, 14 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index df1ffac6b7..3bc5abf27b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -20,8 +20,6 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -56,6 +54,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private final int _receiveBufSize; private final Ticker _ticker; + private ByteBuffer _currentBuffer; public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker) @@ -151,20 +150,13 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } } - try + try(Selector selector = _selector; SocketChannel channel = _socketChannel) { while(!doWrite()) { } - try - { - _receiver.closed(); - } - finally - { - _socketChannel.close(); - } + _receiver.closed(); } catch (IOException e) { @@ -224,19 +216,23 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> private void doRead() throws IOException { - ByteBuffer buffer; int remaining; do { - buffer = ByteBuffer.allocate(_receiveBufSize); - _socketChannel.read(buffer); - remaining = buffer.remaining(); + if(_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + _socketChannel.read(_currentBuffer); + remaining = _currentBuffer.remaining(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Read " + buffer.position() + " byte(s)"); + LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)"); } - buffer.flip(); - _receiver.received(buffer); + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _receiver.received(dup); } while (remaining == 0); |
