diff options
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java | 50 |
1 files changed, 44 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java index 32b2a82945..436082a6e8 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -39,13 +40,22 @@ public class SelectorThread extends Thread private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); - private Selector _selector; + private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); SelectorThread(final String name) { super("SelectorThread-"+name); + try + { + _selector = Selector.open(); + } + catch (IOException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } } @Override @@ -56,9 +66,8 @@ public class SelectorThread extends Thread try { - try (Selector selector = Selector.open()) + try { - _selector = selector; while (!_closed.get()) { @@ -118,6 +127,10 @@ public class SelectorThread extends Thread } } + finally + { + _selector.close(); + } } catch (IOException e) { @@ -145,14 +158,20 @@ public class SelectorThread extends Thread { _closed.set(true); _selector.wakeup(); + _scheduler.close(); } private class NetworkConnectionScheduler { - public void schedule(final NonBlockingConnection connection) - { + private final ScheduledThreadPoolExecutor _executor; + private NetworkConnectionScheduler() + { + _executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors()); + } + public void processConnection(final NonBlockingConnection connection) + { boolean closed = connection.doWork(); if (!closed) @@ -167,6 +186,25 @@ public class SelectorThread extends Thread } } } - } + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + processConnection(connection); + } + }); + } + + public void close() + { + _executor.shutdown(); + } + + + + } } |
