diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-01-29 12:59:50 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-01-29 12:59:50 +0000 |
| commit | 522f8bba207388d22a93ac3a5190a15219f1b1e3 (patch) | |
| tree | c74da0ad0cc51c115c8f4e3434bcd35d68df926c /qpid/java | |
| parent | 8c56534c020229c3ba4d5f32c7ff406fd2ffafe5 (diff) | |
| download | qpid-python-522f8bba207388d22a93ac3a5190a15219f1b1e3.tar.gz | |
Use a threadpool for the scheduler
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655641 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java | 50 |
1 files changed, 44 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java index 32b2a82945..436082a6e8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -39,13 +40,22 @@ public class SelectorThread extends Thread private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); - private Selector _selector; + private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); SelectorThread(final String name) { super("SelectorThread-"+name); + try + { + _selector = Selector.open(); + } + catch (IOException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } } @Override @@ -56,9 +66,8 @@ public class SelectorThread extends Thread try { - try (Selector selector = Selector.open()) + try { - _selector = selector; while (!_closed.get()) { @@ -118,6 +127,10 @@ public class SelectorThread extends Thread } } + finally + { + _selector.close(); + } } catch (IOException e) { @@ -145,14 +158,20 @@ public class SelectorThread extends Thread { _closed.set(true); _selector.wakeup(); + _scheduler.close(); } private class NetworkConnectionScheduler { - public void schedule(final NonBlockingConnection connection) - { + private final ScheduledThreadPoolExecutor _executor; + private NetworkConnectionScheduler() + { + _executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors()); + } + public void processConnection(final NonBlockingConnection connection) + { boolean closed = connection.doWork(); if (!closed) @@ -167,6 +186,25 @@ public class SelectorThread extends Thread } } } - } + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + processConnection(connection); + } + }); + } + + public void close() + { + _executor.shutdown(); + } + + + + } } |
