diff options
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java | 23 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java | 111 |
2 files changed, 113 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java index 95a59ecfd3..031e746e3a 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java @@ -37,17 +37,23 @@ public abstract class MessageActor /** * Indicates whether this MessageActor is closed. */ - boolean _isClosed = false; + private boolean _isClosed = false; /** * This messageActor's session */ - SessionImpl _session; + private SessionImpl _session; /** * The JMS destination this actor is set for. */ - DestinationImpl _destination; + private DestinationImpl _destination; + + + /** + * The ID of this actor for the session. + */ + private String _messageActorID; //-- Constructor @@ -141,4 +147,15 @@ public abstract class MessageActor return _session; } + /** + * Get the ID of this actor within its session. + * + * @return This actor ID. + */ + protected String getMessageActorID() + { + return _messageActorID; + } + + } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index ad682d9e70..1aca96582f 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -28,9 +28,9 @@ import javax.jms.Session; import javax.jms.Message; import javax.jms.MessageListener; import java.io.Serializable; -import java.util.ArrayList; import java.util.Vector; import java.util.LinkedList; +import java.util.HashMap; /** * Implementation of the JMS Session interface @@ -54,7 +54,7 @@ public class SessionImpl implements Session private boolean _hasStopped = false; /** - * lock for the sessionThread to wiat on when the session is stopped + * lock for the sessionThread to wait until the session is stopped */ private Object _stoppingLock = new Object(); @@ -63,11 +63,16 @@ public class SessionImpl implements Session */ private Object _stoppingJoin = new Object(); + /** + * thread to dispatch messages to async consumers + */ + private MessageDispatcherThread _messageDispatcherThread = null; + /** * The messageActors of this session. */ - private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>(); + private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>(); /** * All the not yet acknoledged messages @@ -151,6 +156,10 @@ public class SessionImpl implements Session { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } + // Create and start a MessageDispatcherThread + // This thread is dispatching messages to the async consumers + _messageDispatcherThread = new MessageDispatcherThread(); + _messageDispatcherThread.start(); } //--- javax.jms.Session API @@ -362,10 +371,43 @@ public class SessionImpl implements Session { if (!_isClosed) { + _messageDispatcherThread.interrupt(); + if (!_isClosing) + { + _isClosing = true; + // if the session is stopped then restart it before notifying on the lock + // that will stop the sessionThread + if (_isStopped) + { + start(); + } + + //stop the sessionThread + synchronized (_incomingAsynchronousMessages) + { + _incomingAsynchronousMessages.notifyAll(); + } + + try + { + _messageDispatcherThread.join(); + _messageDispatcherThread = null; + } + catch (InterruptedException ie) + { + /* ignore */ + } + } // from now all the session methods will throw a IllegalStateException _isClosed = true; // close all the actors closeAllActors(); + _messageActors.clear(); + synchronized (_incomingAsynchronousMessages) + { + _incomingAsynchronousMessages.clear(); + _incomingAsynchronousMessages.notifyAll(); + } // close the underlaying QpidSession try { @@ -375,6 +417,7 @@ public class SessionImpl implements Session { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } + } } @@ -466,7 +509,7 @@ public class SessionImpl implements Session checkNotClosed(); MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination); // register this actor with the session - _messageActors.add(producer); + _messageActors.put(producer.getMessageActorID(), producer); return producer; } @@ -523,7 +566,7 @@ public class SessionImpl implements Session checkDestination(destination); MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); // register this actor with the session - _messageActors.add(consumer); + _messageActors.put(consumer.getMessageActorID(), consumer); return consumer; } @@ -610,7 +653,7 @@ public class SessionImpl implements Session checkNotClosed(); checkDestination(topic); TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name); - _messageActors.add(subscriber); + _messageActors.put(subscriber.getMessageActorID(), subscriber); return subscriber; } @@ -643,7 +686,7 @@ public class SessionImpl implements Session checkDestination(queue); QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector); // register this actor with the session - _messageActors.add(browser); + _messageActors.put(browser.getMessageActorID(), browser); return browser; } @@ -710,7 +753,18 @@ public class SessionImpl implements Session */ protected void start() throws JMSException { - // TODO: make sure that the correct options are used + if (_isStopped) + { + synchronized (_stoppingLock) + { + _isStopped = false; + _stoppingLock.notify(); + } + synchronized (_stoppingJoin) + { + _hasStopped = false; + } + } } /** @@ -720,7 +774,30 @@ public class SessionImpl implements Session */ protected void stop() throws JMSException { - // TODO: make sure that the correct options are used + if (!_isClosing && !_isStopped) + { + synchronized (_incomingAsynchronousMessages) + { + _isStopped = true; + // unlock the sessionThread that will then wait on _stoppingLock + _incomingAsynchronousMessages.notifyAll(); + } + // wait for the sessionThread to stop processing messages + synchronized (_stoppingJoin) + { + while (!_hasStopped) + { + try + { + _stoppingJoin.wait(); + } + catch (InterruptedException e) + { + /* ignore */ + } + } + } + } } /** @@ -847,7 +924,7 @@ public class SessionImpl implements Session */ private void closeAllActors() throws JMSException { - for (MessageActor messageActor : _messageActors) + for (MessageActor messageActor : _messageActors.values()) { messageActor.closeMessageActor(); } @@ -861,8 +938,6 @@ public class SessionImpl implements Session * This thread is responsible for removing messages from m_incomingMessages and * dispatching them to the appropriate MessageConsumer. * <p> Messages have to be dispatched serially. - * - * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}". */ private class MessageDispatcherThread extends Thread { @@ -932,27 +1007,27 @@ public class SessionImpl implements Session } } - /* if (message != null) + if (message != null) { MessageConsumerImpl mc; - synchronized (_actors) + synchronized (_messageActors) { - mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID); + mc = null; // todo _messageActors.get(message.consumerID); } boolean consumed = false; if (mc != null) { try { - consumed = mc.onMessage(actorMessage.genericMessage); + // todo call onMessage } catch (RuntimeException t) { // the JMS specification tells us to flag that to the client! - log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t); + _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t); } } - } */ + } message = null; } while (!_isClosing); // repeat as long as this session is not closing |
