summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-29 19:28:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-29 19:28:45 +0000
commite0cc2812fcfd4c83bc7a1e10e57c81864f67450d (patch)
tree44148e81c795d7a07dc197b125c6ef0a96d5397b /qpid/java
parent522f8bba207388d22a93ac3a5190a15219f1b1e3 (diff)
downloadqpid-python-e0cc2812fcfd4c83bc7a1e10e57c81864f67450d.tar.gz
Correctly deregister and also optimise case where work can be rescheduled on same thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java82
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java145
2 files changed, 107 insertions, 120 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
index 151af64b67..73875e5bce 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -165,11 +165,11 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
}
_receiver.setTransportBlockedForWriting(!doWrite());
- doRead();
+ boolean dataRead = doRead();
_fullyWritten = doWrite();
_receiver.setTransportBlockedForWriting(!_fullyWritten);
- if(_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)
+ if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
{
_stateChanged.set(true);
}
@@ -202,64 +202,6 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
}
-
-/* public void run()
- {
- LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
-
-
- while (!_closed.get())
- {
-
- try
- {
- long currentTime = System.currentTimeMillis();
- int tick = _ticker.getTimeToNextTick(currentTime);
- if(tick <= 0)
- {
- tick = _ticker.tick(currentTime);
- }
-
- _selector.select(tick <= 0 ? 1 : tick);
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- selectionKeys.clear();
-
- _receiver.setTransportBlockedForWriting(!doWrite());
- doRead();
- _fullyWritten = doWrite();
- _receiver.setTransportBlockedForWriting(!_fullyWritten);
-
- _socketChannel.register(_selector,
- _fullyWritten
- ? SelectionKey.OP_READ
- : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
-
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
- close();
- }
- }
-
- try(Selector selector = _selector; SocketChannel channel = _socketChannel)
- {
- while(!doWrite())
- {
- }
-
- _receiver.closed();
- }
- catch (IOException e)
- {
- LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
- }
- finally
- {
- LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress);
- }
- }*/
-
@Override
public void flush()
{
@@ -368,9 +310,9 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
}
}
- private void doRead() throws IOException
+ private boolean doRead() throws IOException
{
-
+ boolean readData = false;
if(_transportEncryption == TransportEncryption.NONE)
{
int remaining = 0;
@@ -381,6 +323,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
_currentBuffer = ByteBuffer.allocate(_receiveBufSize);
}
int read = _socketChannel.read(_currentBuffer);
+ if(read > 0)
+ {
+ readData = true;
+ }
if (read == -1)
{
_closed.set(true);
@@ -406,7 +352,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
{
_closed.set(true);
}
-
+ else if(read > 0)
+ {
+ readData = true;
+ }
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Read " + read + " encrypted bytes ");
@@ -427,6 +376,10 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
appInputBuffer.flip();
unwrapped = appInputBuffer.remaining();
+ if(unwrapped > 0)
+ {
+ readData = true;
+ }
_receiver.received(appInputBuffer);
}
while(unwrapped > 0 || tasksRun);
@@ -470,12 +423,13 @@ public class NonBlockingSenderReceiver implements Sender<ByteBuffer>
{
_onTransportEncryptionAction.run();
_netInputBuffer.compact();
- doRead();
+ readData = doRead();
}
break;
}
}
}
+ return readData;
}
private boolean runSSLEngineTasks(final SSLEngineResult status)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
index 436082a6e8..ff89d9b05c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by keith on 28/01/2015.
@@ -66,70 +67,64 @@ public class SelectorThread extends Thread
try
{
- try
+ while (!_closed.get())
{
- while (!_closed.get())
- {
- _selector.select(nextTimeout);
+ _selector.select(nextTimeout);
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
- key.channel().register(_selector, 0);
+ key.channel().register(_selector, 0);
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
- }
- selectionKeys.clear();
+ }
+ selectionKeys.clear();
- while (_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
+ while (_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
- }
+ }
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while (iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
- int period = connection.getTicker().getTimeToNextTick(currentTime);
- if (period < 0 || connection.isStateChanged())
- {
- toBeScheduled.add(connection);
- iterator.remove();
- }
- else
- {
- nextTimeout = Math.min(period, nextTimeout);
- }
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+ if (period < 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ connection.getSocketChannel().register(_selector, 0).cancel();
+ iterator.remove();
}
-
- for (NonBlockingConnection connection : toBeScheduled)
+ else
{
- _scheduler.schedule(connection);
+ nextTimeout = Math.min(period, nextTimeout);
}
+ }
+ for (NonBlockingConnection connection : toBeScheduled)
+ {
+ _scheduler.schedule(connection);
}
- }
- finally
- {
- _selector.close();
+
}
}
catch (IOException e)
@@ -137,6 +132,18 @@ public class SelectorThread extends Thread
//TODO
e.printStackTrace();
}
+ finally
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
@@ -164,26 +171,52 @@ public class SelectorThread extends Thread
private class NetworkConnectionScheduler
{
private final ScheduledThreadPoolExecutor _executor;
+ private final AtomicInteger _running = new AtomicInteger();
+ private final int _poolSize;
private NetworkConnectionScheduler()
{
- _executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
+ _poolSize = Runtime.getRuntime().availableProcessors();
+ _executor = new ScheduledThreadPoolExecutor(_poolSize);
+ _executor.prestartAllCoreThreads();
}
public void processConnection(final NonBlockingConnection connection)
{
- boolean closed = connection.doWork();
-
- if (!closed)
+ try
{
- if (connection.isStateChanged())
+ _running.incrementAndGet();
+ boolean rerun;
+ do
{
- schedule(connection);
- }
- else
- {
- SelectorThread.this.addConnection(connection);
- }
+ rerun = false;
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+
+ if (connection.isStateChanged())
+ {
+ if (_running.get() == _poolSize)
+ {
+ schedule(connection);
+ }
+ else
+ {
+ rerun = true;
+ }
+ }
+ else
+ {
+ SelectorThread.this.addConnection(connection);
+ }
+ }
+
+ } while (rerun);
+ }
+ finally
+ {
+ _running.decrementAndGet();
}
}