summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-01-29 12:59:50 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-01-29 12:59:50 +0000
commit522f8bba207388d22a93ac3a5190a15219f1b1e3 (patch)
treec74da0ad0cc51c115c8f4e3434bcd35d68df926c /qpid/java
parent8c56534c020229c3ba4d5f32c7ff406fd2ffafe5 (diff)
downloadqpid-python-522f8bba207388d22a93ac3a5190a15219f1b1e3.tar.gz
Use a threadpool for the scheduler
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655641 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java50
1 files changed, 44 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
index 32b2a82945..436082a6e8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -39,13 +40,22 @@ public class SelectorThread extends Thread
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
- private Selector _selector;
+ private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
SelectorThread(final String name)
{
super("SelectorThread-"+name);
+ try
+ {
+ _selector = Selector.open();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -56,9 +66,8 @@ public class SelectorThread extends Thread
try
{
- try (Selector selector = Selector.open())
+ try
{
- _selector = selector;
while (!_closed.get())
{
@@ -118,6 +127,10 @@ public class SelectorThread extends Thread
}
}
+ finally
+ {
+ _selector.close();
+ }
}
catch (IOException e)
{
@@ -145,14 +158,20 @@ public class SelectorThread extends Thread
{
_closed.set(true);
_selector.wakeup();
+ _scheduler.close();
}
private class NetworkConnectionScheduler
{
- public void schedule(final NonBlockingConnection connection)
- {
+ private final ScheduledThreadPoolExecutor _executor;
+ private NetworkConnectionScheduler()
+ {
+ _executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
+ }
+ public void processConnection(final NonBlockingConnection connection)
+ {
boolean closed = connection.doWork();
if (!closed)
@@ -167,6 +186,25 @@ public class SelectorThread extends Thread
}
}
}
- }
+ public void schedule(final NonBlockingConnection connection)
+ {
+ _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ processConnection(connection);
+ }
+ });
+ }
+
+ public void close()
+ {
+ _executor.shutdown();
+ }
+
+
+
+ }
}