From 5a53d804f8b548cd4f4829ca8322f76e7f5d8767 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 9 Dec 2014 10:00:24 +0000 Subject: Ensure selector is closed, continue to use same backing buffer on read until it is full git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644014 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/io/NonBlockingSenderReceiver.java | 32 ++++++++++------------ 1 file changed, 14 insertions(+), 18 deletions(-) (limited to 'qpid/java/common/src') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index df1ffac6b7..3bc5abf27b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -20,8 +20,6 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -56,6 +54,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender private final int _receiveBufSize; private final Ticker _ticker; + private ByteBuffer _currentBuffer; public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver receiver, int receiveBufSize, Ticker ticker) @@ -151,20 +150,13 @@ public class NonBlockingSenderReceiver implements Runnable, Sender } } - try + try(Selector selector = _selector; SocketChannel channel = _socketChannel) { while(!doWrite()) { } - try - { - _receiver.closed(); - } - finally - { - _socketChannel.close(); - } + _receiver.closed(); } catch (IOException e) { @@ -224,19 +216,23 @@ public class NonBlockingSenderReceiver implements Runnable, Sender private void doRead() throws IOException { - ByteBuffer buffer; int remaining; do { - buffer = ByteBuffer.allocate(_receiveBufSize); - _socketChannel.read(buffer); - remaining = buffer.remaining(); + if(_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + _socketChannel.read(_currentBuffer); + remaining = _currentBuffer.remaining(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Read " + buffer.position() + " byte(s)"); + LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)"); } - buffer.flip(); - _receiver.received(buffer); + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _receiver.received(dup); } while (remaining == 0); -- cgit v1.2.1