summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-12 15:21:50 +0000
committerKeith Wall <kwall@apache.org>2014-12-12 15:21:50 +0000
commitf5c07292ad4754608b8f9876241baac671fdce3b (patch)
treefabf69b2c67421e6d078f318f0289ee0f4b50bfc /qpid/java
parent18e69f08d213313070f69074c699d0f70ba267cf (diff)
downloadqpid-python-f5c07292ad4754608b8f9876241baac671fdce3b.tar.gz
Reduce logging, use bytebuffers on the read path in a more optimal fashsion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644928 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java92
3 files changed, 46 insertions, 50 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index b52b59aa15..467115c76f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -78,7 +78,7 @@ final class IoReceiver implements Runnable
throw new RuntimeException("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
- receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress()));
}
public void initiate()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index e06782c58a..eef418df79 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -88,7 +88,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+ senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress));
}
public void initiate()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 8f96baff94..d420b6b3a0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -53,6 +53,7 @@ import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
+ public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
private final SocketChannel _socketChannel;
private final Selector _selector;
@@ -118,7 +119,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
_sslEngine.setWantClientAuth(true);
}
- _netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
}
try
@@ -139,11 +140,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
catch(Exception e)
{
- throw new SenderException("Error creating SenderReceiver thread for " + _remoteSocketAddress, e);
+ throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e);
}
_ioThread.setDaemon(true);
- _ioThread.setName(String.format("IoSenderReceiver - %s", _remoteSocketAddress));
+ _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress));
}
@@ -173,14 +174,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
@Override
public void run()
{
-
LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
- // never ending loop doing
- // try to write all pending byte buffers, handle situation where zero bytes or part of a byte buffer is written
- // read as much as you can
- // try to write all pending byte buffers
-
while (!_closed.get())
{
@@ -194,14 +189,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
tick = _ticker.tick(currentTime);
}
- LOGGER.debug("Tick " + tick);
-
- int numberReady = _selector.select(tick <= 0 ? 1 : tick);
+ _selector.select(tick <= 0 ? 1 : tick);
Set<SelectionKey> selectionKeys = _selector.selectedKeys();
selectionKeys.clear();
- LOGGER.debug("Number Ready " + numberReady);
-
_receiver.setTransportBlockedForWriting(!doWrite());
doRead();
boolean fullyWritten = doWrite();
@@ -230,21 +221,18 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
catch (IOException e)
{
- LOGGER.info("Exception performing final output for thread '" + _remoteSocketAddress + "': " + e);
+ LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
}
finally
{
- LOGGER.info("Shutting down IO thread for " + _remoteSocketAddress);
+ LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress);
}
}
-
-
@Override
public void flush()
{
- // maybe just wakeup?
-
+ _selector.wakeup();
}
@Override
@@ -273,7 +261,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
{
- _socketChannel.write(bufArray);
+ long written = _socketChannel.write(bufArray);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " bytes");
+ }
for (ByteBuffer buf : bufArray)
{
@@ -284,10 +276,6 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
}
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
- }
return bufArray.length == byteBuffersWritten;
}
@@ -297,16 +285,13 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
do
{
- LOGGER.debug("Handshake status: " + _sslEngine.getHandshakeStatus());
if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
{
final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
_status = _sslEngine.wrap(bufArray, netBuffer);
- LOGGER.debug("Status: " + _status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus());
runSSLEngineTasks(_status);
netBuffer.flip();
- LOGGER.debug("Encrypted " + netBuffer.remaining() + " bytes for output");
remaining = netBuffer.remaining();
if (remaining != 0)
{
@@ -327,7 +312,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
long written = _socketChannel.write(encryptedBuffers);
- LOGGER.debug("Written " + written + " encrypted bytes");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + written + " encrypted bytes");
+ }
ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
while(iter.hasNext())
@@ -348,7 +336,6 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
}
else
{
- // TODO - actually implement
return true;
}
}
@@ -373,7 +360,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
remaining = _currentBuffer.remaining();
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)");
+ LOGGER.debug("Read " + read + " byte(s)");
}
ByteBuffer dup = _currentBuffer.duplicate();
dup.flip();
@@ -384,8 +371,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
else if(_transportEncryption == TransportEncryption.TLS)
{
int read = 1;
- int unwrapped = 0;
- while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+ while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
{
read = _socketChannel.read(_netInputBuffer);
if (read == -1)
@@ -393,22 +379,30 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_closed.set(true);
}
- LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+ }
_netInputBuffer.flip();
- ByteBuffer appInputBuffer =
- ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
- _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
- LOGGER.debug("Status: " +_status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus());
- _netInputBuffer.compact();
- appInputBuffer.flip();
- unwrapped = appInputBuffer.remaining();
- LOGGER.debug("Unwrapped to " + unwrapped + " bytes");
+ int unwrapped = 0;
+ do
+ {
+ ByteBuffer appInputBuffer =
+ ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
- _receiver.received(appInputBuffer);
+ _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+ runSSLEngineTasks(_status);
+
+ appInputBuffer.flip();
+ unwrapped = appInputBuffer.remaining();
+ _receiver.received(appInputBuffer);
+ }
+ while(unwrapped > 0);
+
+ _netInputBuffer.compact();
- runSSLEngineTasks(_status);
}
}
else
@@ -423,12 +417,15 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
_closed.set(true);
}
- LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
+ }
- if (_netInputBuffer.position() >= 6)
+ if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
{
_netInputBuffer.flip();
- final byte[] headerBytes = new byte[6];
+ final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
ByteBuffer dup = _netInputBuffer.duplicate();
dup.get(headerBytes);
@@ -458,7 +455,6 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
Runnable task;
while((task = _sslEngine.getDelegatedTask()) != null)
{
- LOGGER.debug("Running task");
task.run();
}
}