diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-11-21 16:32:53 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-11-21 16:32:53 +0000 |
| commit | da89c7fe7cb06c3bb8c514fd31af353f3c53c978 (patch) | |
| tree | 550b5fc7d67aa6dd91486ded53312fbd95d27029 /java/client/src | |
| parent | ed96b4f0058927f0c9c2dada5b506d32390cf0ab (diff) | |
| download | qpid-python-da89c7fe7cb06c3bb8c514fd31af353f3c53c978.tar.gz | |
This is related to QPID-1479.
For starters I have changed the IoSender.java IoReceiver.java and AMQSession.java#Dispatcher to use the Thread factory to create the threads they require.
The ThreadFactory has two implimentations, the default being the java.lang.Threads.
The other is the RealtimeThreadFactory which uses reflection to create threads with a specific priority.
-Dqpid.thread_factory=<thread_factory_class> will decide which thread factory should be loaded.
-Dqpid.rt_thread_priority=<int> specifies the gloabl real time thread priority and defaults to 20.
You could also set individual thread priorities by adding the nessacery config+code changes.
I have also changed the Testkit and QpidBench to use the Thread factory so you could use them for testing/benchmarking work on RT JVMs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5d12d9520..af0ed3faa3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -67,16 +67,26 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; -import org.apache.qpid.client.message.*; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.CloseConsumerMessage; +import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.message.JMSObjectMessage; +import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; +import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,6 +281,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; + + protected Thread _dispatcherThread; /** Holds the message factory factory for this session. */ protected MessageFactoryRegistry _messageFactoryRegistry; @@ -668,7 +680,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcher != null) { // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); + _dispatcherThread.interrupt(); } } @@ -1852,7 +1864,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void startDispatcherIfNecessary() { //If we are the dispatcher then we don't need to check we are started - if (Thread.currentThread() == _dispatcher) + if (Thread.currentThread() == _dispatcherThread) { return; } @@ -1883,9 +1895,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcher == null) { _dispatcher = new Dispatcher(); - _dispatcher.setDaemon(true); + try + { + _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher); + + } + catch(Exception e) + { + throw new Error("Error creating Dispatcher thread",e); + } + _dispatcherThread.setName("Dispatcher-Channel-" + _channelId); + _dispatcherThread.setDaemon(true); _dispatcher.setConnectionStopped(initiallyStopped); - _dispatcher.start(); + _dispatcherThread.start(); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(_dispatcherThread.getName() + " created"); + } } else { @@ -2606,7 +2632,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - class Dispatcher extends Thread + class Dispatcher implements Runnable { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ @@ -2615,21 +2641,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Object _lock = new Object(); private String dispatcherID = "" + System.identityHashCode(this); - - public Dispatcher() { - super("Dispatcher-Channel-" + _channelId); - if (_dispatcherLogger.isInfoEnabled()) - { - _dispatcherLogger.info(getName() + " created"); - } } public void close() { _closed.set(true); - interrupt(); + _dispatcherThread.interrupt(); // fixme awaitTermination @@ -2708,7 +2727,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(getName() + " started"); + _dispatcherLogger.info(_dispatcherThread.getName() + " started"); } UnprocessedMessage message; @@ -2771,7 +2790,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); + _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId); } } |
