summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-11-21 17:57:16 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-11-21 17:57:16 +0000
commit61523667e8589275138a66ad23fda254c66c7dfe (patch)
treeb527337e67db558c4612a01831281a3eb679c1f3 /java/client/src
parentda89c7fe7cb06c3bb8c514fd31af353f3c53c978 (diff)
downloadqpid-python-61523667e8589275138a66ad23fda254c66c7dfe.tar.gz
Appologies for the sudden checkin without notice, close to the release cycle.
Reverting the changes back. Will attach a patch and commit after the release. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java55
1 files changed, 18 insertions, 37 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index af0ed3faa3..b5d12d9520 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -67,26 +67,16 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.CloseConsumerMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -281,8 +271,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
-
- protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry _messageFactoryRegistry;
@@ -680,7 +668,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcher != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcherThread.interrupt();
+ _dispatcher.interrupt();
}
}
@@ -1864,7 +1852,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void startDispatcherIfNecessary()
{
//If we are the dispatcher then we don't need to check we are started
- if (Thread.currentThread() == _dispatcherThread)
+ if (Thread.currentThread() == _dispatcher)
{
return;
}
@@ -1895,23 +1883,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
- try
- {
- _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
-
- }
- catch(Exception e)
- {
- throw new Error("Error creating Dispatcher thread",e);
- }
- _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
- _dispatcherThread.setDaemon(true);
+ _dispatcher.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
- _dispatcherThread.start();
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(_dispatcherThread.getName() + " created");
- }
+ _dispatcher.start();
}
else
{
@@ -2632,7 +2606,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
- class Dispatcher implements Runnable
+ class Dispatcher extends Thread
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
@@ -2641,14 +2615,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Object _lock = new Object();
private String dispatcherID = "" + System.identityHashCode(this);
+
+
public Dispatcher()
{
+ super("Dispatcher-Channel-" + _channelId);
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " created");
+ }
}
public void close()
{
_closed.set(true);
- _dispatcherThread.interrupt();
+ interrupt();
// fixme awaitTermination
@@ -2727,7 +2708,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " started");
+ _dispatcherLogger.info(getName() + " started");
}
UnprocessedMessage message;
@@ -2790,7 +2771,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
+ _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
}
}