summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java111
2 files changed, 113 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
index 95a59ecfd3..031e746e3a 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
@@ -37,17 +37,23 @@ public abstract class MessageActor
/**
* Indicates whether this MessageActor is closed.
*/
- boolean _isClosed = false;
+ private boolean _isClosed = false;
/**
* This messageActor's session
*/
- SessionImpl _session;
+ private SessionImpl _session;
/**
* The JMS destination this actor is set for.
*/
- DestinationImpl _destination;
+ private DestinationImpl _destination;
+
+
+ /**
+ * The ID of this actor for the session.
+ */
+ private String _messageActorID;
//-- Constructor
@@ -141,4 +147,15 @@ public abstract class MessageActor
return _session;
}
+ /**
+ * Get the ID of this actor within its session.
+ *
+ * @return This actor ID.
+ */
+ protected String getMessageActorID()
+ {
+ return _messageActorID;
+ }
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index ad682d9e70..1aca96582f 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -28,9 +28,9 @@ import javax.jms.Session;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Vector;
import java.util.LinkedList;
+import java.util.HashMap;
/**
* Implementation of the JMS Session interface
@@ -54,7 +54,7 @@ public class SessionImpl implements Session
private boolean _hasStopped = false;
/**
- * lock for the sessionThread to wiat on when the session is stopped
+ * lock for the sessionThread to wait until the session is stopped
*/
private Object _stoppingLock = new Object();
@@ -63,11 +63,16 @@ public class SessionImpl implements Session
*/
private Object _stoppingJoin = new Object();
+ /**
+ * thread to dispatch messages to async consumers
+ */
+ private MessageDispatcherThread _messageDispatcherThread = null;
+
/**
* The messageActors of this session.
*/
- private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>();
+ private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
/**
* All the not yet acknoledged messages
@@ -151,6 +156,10 @@ public class SessionImpl implements Session
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
+ // Create and start a MessageDispatcherThread
+ // This thread is dispatching messages to the async consumers
+ _messageDispatcherThread = new MessageDispatcherThread();
+ _messageDispatcherThread.start();
}
//--- javax.jms.Session API
@@ -362,10 +371,43 @@ public class SessionImpl implements Session
{
if (!_isClosed)
{
+ _messageDispatcherThread.interrupt();
+ if (!_isClosing)
+ {
+ _isClosing = true;
+ // if the session is stopped then restart it before notifying on the lock
+ // that will stop the sessionThread
+ if (_isStopped)
+ {
+ start();
+ }
+
+ //stop the sessionThread
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _incomingAsynchronousMessages.notifyAll();
+ }
+
+ try
+ {
+ _messageDispatcherThread.join();
+ _messageDispatcherThread = null;
+ }
+ catch (InterruptedException ie)
+ {
+ /* ignore */
+ }
+ }
// from now all the session methods will throw a IllegalStateException
_isClosed = true;
// close all the actors
closeAllActors();
+ _messageActors.clear();
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _incomingAsynchronousMessages.clear();
+ _incomingAsynchronousMessages.notifyAll();
+ }
// close the underlaying QpidSession
try
{
@@ -375,6 +417,7 @@ public class SessionImpl implements Session
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
+
}
}
@@ -466,7 +509,7 @@ public class SessionImpl implements Session
checkNotClosed();
MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination);
// register this actor with the session
- _messageActors.add(producer);
+ _messageActors.put(producer.getMessageActorID(), producer);
return producer;
}
@@ -523,7 +566,7 @@ public class SessionImpl implements Session
checkDestination(destination);
MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
// register this actor with the session
- _messageActors.add(consumer);
+ _messageActors.put(consumer.getMessageActorID(), consumer);
return consumer;
}
@@ -610,7 +653,7 @@ public class SessionImpl implements Session
checkNotClosed();
checkDestination(topic);
TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
- _messageActors.add(subscriber);
+ _messageActors.put(subscriber.getMessageActorID(), subscriber);
return subscriber;
}
@@ -643,7 +686,7 @@ public class SessionImpl implements Session
checkDestination(queue);
QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector);
// register this actor with the session
- _messageActors.add(browser);
+ _messageActors.put(browser.getMessageActorID(), browser);
return browser;
}
@@ -710,7 +753,18 @@ public class SessionImpl implements Session
*/
protected void start() throws JMSException
{
- // TODO: make sure that the correct options are used
+ if (_isStopped)
+ {
+ synchronized (_stoppingLock)
+ {
+ _isStopped = false;
+ _stoppingLock.notify();
+ }
+ synchronized (_stoppingJoin)
+ {
+ _hasStopped = false;
+ }
+ }
}
/**
@@ -720,7 +774,30 @@ public class SessionImpl implements Session
*/
protected void stop() throws JMSException
{
- // TODO: make sure that the correct options are used
+ if (!_isClosing && !_isStopped)
+ {
+ synchronized (_incomingAsynchronousMessages)
+ {
+ _isStopped = true;
+ // unlock the sessionThread that will then wait on _stoppingLock
+ _incomingAsynchronousMessages.notifyAll();
+ }
+ // wait for the sessionThread to stop processing messages
+ synchronized (_stoppingJoin)
+ {
+ while (!_hasStopped)
+ {
+ try
+ {
+ _stoppingJoin.wait();
+ }
+ catch (InterruptedException e)
+ {
+ /* ignore */
+ }
+ }
+ }
+ }
}
/**
@@ -847,7 +924,7 @@ public class SessionImpl implements Session
*/
private void closeAllActors() throws JMSException
{
- for (MessageActor messageActor : _messageActors)
+ for (MessageActor messageActor : _messageActors.values())
{
messageActor.closeMessageActor();
}
@@ -861,8 +938,6 @@ public class SessionImpl implements Session
* This thread is responsible for removing messages from m_incomingMessages and
* dispatching them to the appropriate MessageConsumer.
* <p> Messages have to be dispatched serially.
- *
- * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}".
*/
private class MessageDispatcherThread extends Thread
{
@@ -932,27 +1007,27 @@ public class SessionImpl implements Session
}
}
- /* if (message != null)
+ if (message != null)
{
MessageConsumerImpl mc;
- synchronized (_actors)
+ synchronized (_messageActors)
{
- mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID);
+ mc = null; // todo _messageActors.get(message.consumerID);
}
boolean consumed = false;
if (mc != null)
{
try
{
- consumed = mc.onMessage(actorMessage.genericMessage);
+ // todo call onMessage
}
catch (RuntimeException t)
{
// the JMS specification tells us to flag that to the client!
- log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
+ _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
}
}
- } */
+ }
message = null;
}
while (!_isClosing); // repeat as long as this session is not closing