diff options
| author | Keith Wall <kwall@apache.org> | 2014-12-12 15:21:50 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-12-12 15:21:50 +0000 |
| commit | f5c07292ad4754608b8f9876241baac671fdce3b (patch) | |
| tree | fabf69b2c67421e6d078f318f0289ee0f4b50bfc /qpid/java | |
| parent | 18e69f08d213313070f69074c699d0f70ba267cf (diff) | |
| download | qpid-python-f5c07292ad4754608b8f9876241baac671fdce3b.tar.gz | |
Reduce logging, use bytebuffers on the read path in a more optimal fashsion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644928 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 46 insertions, 50 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index b52b59aa15..467115c76f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -78,7 +78,7 @@ final class IoReceiver implements Runnable throw new RuntimeException("Error creating IOReceiver thread",e); } receiverThread.setDaemon(true); - receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress())); } public void initiate() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index e06782c58a..eef418df79 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -88,7 +88,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); + senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress)); } public void initiate() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 8f96baff94..d420b6b3a0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -53,6 +53,7 @@ import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class); + public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; private final SocketChannel _socketChannel; private final Selector _selector; @@ -118,7 +119,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> { _sslEngine.setWantClientAuth(true); } - _netInputBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); + _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); } try @@ -139,11 +140,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } catch(Exception e) { - throw new SenderException("Error creating SenderReceiver thread for " + _remoteSocketAddress, e); + throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e); } _ioThread.setDaemon(true); - _ioThread.setName(String.format("IoSenderReceiver - %s", _remoteSocketAddress)); + _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress)); } @@ -173,14 +174,8 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> @Override public void run() { - LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started"); - // never ending loop doing - // try to write all pending byte buffers, handle situation where zero bytes or part of a byte buffer is written - // read as much as you can - // try to write all pending byte buffers - while (!_closed.get()) { @@ -194,14 +189,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> tick = _ticker.tick(currentTime); } - LOGGER.debug("Tick " + tick); - - int numberReady = _selector.select(tick <= 0 ? 1 : tick); + _selector.select(tick <= 0 ? 1 : tick); Set<SelectionKey> selectionKeys = _selector.selectedKeys(); selectionKeys.clear(); - LOGGER.debug("Number Ready " + numberReady); - _receiver.setTransportBlockedForWriting(!doWrite()); doRead(); boolean fullyWritten = doWrite(); @@ -230,21 +221,18 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } catch (IOException e) { - LOGGER.info("Exception performing final output for thread '" + _remoteSocketAddress + "': " + e); + LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); } finally { - LOGGER.info("Shutting down IO thread for " + _remoteSocketAddress); + LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress); } } - - @Override public void flush() { - // maybe just wakeup? - + _selector.wakeup(); } @Override @@ -273,7 +261,11 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> { - _socketChannel.write(bufArray); + long written = _socketChannel.write(bufArray); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " bytes"); + } for (ByteBuffer buf : bufArray) { @@ -284,10 +276,6 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } } - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely"); - } return bufArray.length == byteBuffersWritten; } @@ -297,16 +285,13 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> do { - LOGGER.debug("Handshake status: " + _sslEngine.getHandshakeStatus()); if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize()); _status = _sslEngine.wrap(bufArray, netBuffer); - LOGGER.debug("Status: " + _status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus()); runSSLEngineTasks(_status); netBuffer.flip(); - LOGGER.debug("Encrypted " + netBuffer.remaining() + " bytes for output"); remaining = netBuffer.remaining(); if (remaining != 0) { @@ -327,7 +312,10 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); long written = _socketChannel.write(encryptedBuffers); - LOGGER.debug("Written " + written + " encrypted bytes"); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Written " + written + " encrypted bytes"); + } ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); while(iter.hasNext()) @@ -348,7 +336,6 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> } else { - // TODO - actually implement return true; } } @@ -373,7 +360,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> remaining = _currentBuffer.remaining(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)"); + LOGGER.debug("Read " + read + " byte(s)"); } ByteBuffer dup = _currentBuffer.duplicate(); dup.flip(); @@ -384,8 +371,7 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> else if(_transportEncryption == TransportEncryption.TLS) { int read = 1; - int unwrapped = 0; - while(!_closed.get() && (read > 0 || unwrapped > 0) && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) + while(!_closed.get() && read > 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED)) { read = _socketChannel.read(_netInputBuffer); if (read == -1) @@ -393,22 +379,30 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _closed.set(true); } - LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer); + } _netInputBuffer.flip(); - ByteBuffer appInputBuffer = - ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); - _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); - LOGGER.debug("Status: " +_status.getStatus() + " HandshakeStatus " + _status.getHandshakeStatus()); - _netInputBuffer.compact(); - appInputBuffer.flip(); - unwrapped = appInputBuffer.remaining(); - LOGGER.debug("Unwrapped to " + unwrapped + " bytes"); + int unwrapped = 0; + do + { + ByteBuffer appInputBuffer = + ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50); - _receiver.received(appInputBuffer); + _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer); + runSSLEngineTasks(_status); + + appInputBuffer.flip(); + unwrapped = appInputBuffer.remaining(); + _receiver.received(appInputBuffer); + } + while(unwrapped > 0); + + _netInputBuffer.compact(); - runSSLEngineTasks(_status); } } else @@ -423,12 +417,15 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> _closed.set(true); } - LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer); + } - if (_netInputBuffer.position() >= 6) + if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK) { _netInputBuffer.flip(); - final byte[] headerBytes = new byte[6]; + final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK]; ByteBuffer dup = _netInputBuffer.duplicate(); dup.get(headerBytes); @@ -458,7 +455,6 @@ public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer> Runnable task; while((task = _sslEngine.getDelegatedTask()) != null) { - LOGGER.debug("Running task"); task.run(); } } |
