diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-01-29 19:28:45 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-01-29 19:28:45 +0000 |
| commit | e0cc2812fcfd4c83bc7a1e10e57c81864f67450d (patch) | |
| tree | 44148e81c795d7a07dc197b125c6ef0a96d5397b /qpid/java | |
| parent | 522f8bba207388d22a93ac3a5190a15219f1b1e3 (diff) | |
| download | qpid-python-e0cc2812fcfd4c83bc7a1e10e57c81864f67450d.tar.gz | |
Correctly deregister and also optimise case where work can be rescheduled on same thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 107 insertions, 120 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java index 151af64b67..73875e5bce 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java @@ -165,11 +165,11 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> } _receiver.setTransportBlockedForWriting(!doWrite()); - doRead(); + boolean dataRead = doRead(); _fullyWritten = doWrite(); _receiver.setTransportBlockedForWriting(!_fullyWritten); - if(_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0) + if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) { _stateChanged.set(true); } @@ -202,64 +202,6 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> } - -/* public void run() - { - LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started"); - - - while (!_closed.get()) - { - - try - { - long currentTime = System.currentTimeMillis(); - int tick = _ticker.getTimeToNextTick(currentTime); - if(tick <= 0) - { - tick = _ticker.tick(currentTime); - } - - _selector.select(tick <= 0 ? 1 : tick); - Set<SelectionKey> selectionKeys = _selector.selectedKeys(); - selectionKeys.clear(); - - _receiver.setTransportBlockedForWriting(!doWrite()); - doRead(); - _fullyWritten = doWrite(); - _receiver.setTransportBlockedForWriting(!_fullyWritten); - - _socketChannel.register(_selector, - _fullyWritten - ? SelectionKey.OP_READ - : (SelectionKey.OP_WRITE | SelectionKey.OP_READ)); - - } - catch (IOException e) - { - LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); - close(); - } - } - - try(Selector selector = _selector; SocketChannel channel = _socketChannel) - { - while(!doWrite()) - { - } - - _receiver.closed(); - } - catch (IOException e) - { - LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e); - } - finally - { - LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress); - } - }*/ - @Override public void flush() { @@ -368,9 +310,9 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> } } - private void doRead() throws IOException + private boolean doRead() throws IOException { - + boolean readData = false; if(_transportEncryption == TransportEncryption.NONE) { int remaining = 0; @@ -381,6 +323,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> _currentBuffer = ByteBuffer.allocate(_receiveBufSize); } int read = _socketChannel.read(_currentBuffer); + if(read > 0) + { + readData = true; + } if (read == -1) { _closed.set(true); @@ -406,7 +352,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> { _closed.set(true); } - + else if(read > 0) + { + readData = true; + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Read " + read + " encrypted bytes "); @@ -427,6 +376,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> appInputBuffer.flip(); unwrapped = appInputBuffer.remaining(); + if(unwrapped > 0) + { + readData = true; + } _receiver.received(appInputBuffer); } while(unwrapped > 0 || tasksRun); @@ -470,12 +423,13 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer> { _onTransportEncryptionAction.run(); _netInputBuffer.compact(); - doRead(); + readData = doRead(); } break; } } } + return readData; } private boolean runSSLEngineTasks(final SSLEngineResult status) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java index 436082a6e8..ff89d9b05c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by keith on 28/01/2015. @@ -66,70 +67,64 @@ public class SelectorThread extends Thread try { - try + while (!_closed.get()) { - while (!_closed.get()) - { - _selector.select(nextTimeout); + _selector.select(nextTimeout); - List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - Set<SelectionKey> selectionKeys = _selector.selectedKeys(); - for (SelectionKey key : selectionKeys) - { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - key.channel().register(_selector, 0); + key.channel().register(_selector, 0); - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); - } - selectionKeys.clear(); + } + selectionKeys.clear(); - while (_unregisteredConnections.peek() != null) - { - NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); - _unscheduledConnections.add(unregisteredConnection); + while (_unregisteredConnections.peek() != null) + { + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); - final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) - | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); - unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); - } + } - long currentTime = System.currentTimeMillis(); - Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); - nextTimeout = Integer.MAX_VALUE; - while (iterator.hasNext()) - { - NonBlockingConnection connection = iterator.next(); + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); - int period = connection.getTicker().getTimeToNextTick(currentTime); - if (period < 0 || connection.isStateChanged()) - { - toBeScheduled.add(connection); - iterator.remove(); - } - else - { - nextTimeout = Math.min(period, nextTimeout); - } + int period = connection.getTicker().getTimeToNextTick(currentTime); + if (period < 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + connection.getSocketChannel().register(_selector, 0).cancel(); + iterator.remove(); } - - for (NonBlockingConnection connection : toBeScheduled) + else { - _scheduler.schedule(connection); + nextTimeout = Math.min(period, nextTimeout); } + } + for (NonBlockingConnection connection : toBeScheduled) + { + _scheduler.schedule(connection); } - } - finally - { - _selector.close(); + } } catch (IOException e) @@ -137,6 +132,18 @@ public class SelectorThread extends Thread //TODO e.printStackTrace(); } + finally + { + try + { + _selector.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + @@ -164,26 +171,52 @@ public class SelectorThread extends Thread private class NetworkConnectionScheduler { private final ScheduledThreadPoolExecutor _executor; + private final AtomicInteger _running = new AtomicInteger(); + private final int _poolSize; private NetworkConnectionScheduler() { - _executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors()); + _poolSize = Runtime.getRuntime().availableProcessors(); + _executor = new ScheduledThreadPoolExecutor(_poolSize); + _executor.prestartAllCoreThreads(); } public void processConnection(final NonBlockingConnection connection) { - boolean closed = connection.doWork(); - - if (!closed) + try { - if (connection.isStateChanged()) + _running.incrementAndGet(); + boolean rerun; + do { - schedule(connection); - } - else - { - SelectorThread.this.addConnection(connection); - } + rerun = false; + boolean closed = connection.doWork(); + + if (!closed) + { + + if (connection.isStateChanged()) + { + if (_running.get() == _poolSize) + { + schedule(connection); + } + else + { + rerun = true; + } + } + else + { + SelectorThread.this.addConnection(connection); + } + } + + } while (rerun); + } + finally + { + _running.decrementAndGet(); } } |
