diff options
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 155 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 14 |
2 files changed, 95 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b6429972df..c436121855 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private final FlowControllingBlockingQueue _queue; - private final java.util.Queue<MessageConsumerPair> _reprocessQueue; - private Dispatcher _dispatcher; private MessageFactoryRegistry _messageFactoryRegistry; @@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private long _nextProducerId; - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ - private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false); - - /** - * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer - */ - private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false); /** * Set when recover is called. This is to handle the case where recover() is called by application code @@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private boolean _inRecovery; + private boolean _connectionStopped; + private boolean _hasMessageListeners; /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ - + private class Dispatcher extends Thread { + + /** + * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. + */ + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private final Object _lock = new Object(); + public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { UnprocessedMessage message; - _stopped.set(false); + try { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) { - dispatchMessage(message); + synchronized (_lock) + { + + while (connectionStopped()) + { + _lock.wait(); + } + + dispatchMessage(message); + + while (connectionStopped()) + { + _lock.wait(); + } + + } + } } catch (InterruptedException e) @@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.info("Dispatcher thread terminating for channel " + _channelId); } + // only call while holding lock + final boolean connectionStopped() + { + return _connectionStopped; + } + + void setConnectionStopped(boolean connectionStopped) + { + synchronized (_lock) + { + _connectionStopped = connectionStopped; + _lock.notify(); + } + } + private void dispatchMessage(UnprocessedMessage message) { if (message.getDeliverBody() != null) @@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void stopDispatcher() + public void close() { - _stopped.set(true); + _closed.set(true); interrupt(); + + //fixme awaitTermination + } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) { @@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _defaultPrefetchHighMark = defaultPrefetchHighMark; _defaultPrefetchLowMark = defaultPrefetchLowMark; - _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>(); - if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, @@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - + public void rollback() throws JMSException { checkTransacted(); @@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - _dispatcher.stopDispatcher(); + _dispatcher.close(); + _dispatcher = null; } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception @@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // TODO: Be aware of possible changes to parameter order as versions change. getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false)); // requeue + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + false)); // requeue } boolean isInRecovery() @@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageListener getMessageListener() throws JMSException { - checkNotClosed(); +// checkNotClosed(); return _messageListener; } public void setMessageListener(MessageListener listener) throws JMSException { - checkNotClosed(); - - if (!isStopped()) - { - throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); - } - - // We are stopped - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - BasicMessageConsumer consumer = i.next(); - - if (consumer.isReceiving()) - { - throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); - } - } - - _messageListener = listener; - - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().setMessageListener(_messageListener); - } - +// checkNotClosed(); +// +// if (_dispatcher != null && !_dispatcher.connectionStopped()) +// { +// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); +// } +// +// // We are stopped +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// BasicMessageConsumer consumer = i.next(); +// +// if (consumer.isReceiving()) +// { +// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); +// } +// } +// +// _messageListener = listener; +// +// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) +// { +// i.next().setMessageListener(_messageListener); +// } } @@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { + //fixme This should be controlled by _stopped as it pairs with the stop method + //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled. + //will result in sending Flow messages for each subsequent call to flow.. only need to do this + // if we have called stop. if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); } - if(hasMessageListeners() && _dispatcher == null) + if (hasMessageListeners()) { startDistpatcherIfNecessary(); } @@ -1606,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if(_dispatcher == null) + if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); @@ -1618,14 +1648,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { //stop the server delivering messages to this session suspendChannel(); - - //stop the dispatcher thread - _stopped.set(true); - } - - boolean isStopped() - { - return _stopped.get(); + _dispatcher.setConnectionStopped(true); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4ffec6fb41..832c312634 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer //i.e. it is only valid to call this method if // - // (a) the session is stopped, in which case the dispatcher is not running + // (a) the connection is stopped, in which case the dispatcher is not running // OR // (b) the listener is null AND we are not receiving synchronously at present // - if (_session.isStopped()) + if (!_session.getAMQConnection().started()) { _messageListener.set(messageListener); _session.setHasMessageListeners(); - _session.startDistpatcherIfNecessary(); if (_logger.isDebugEnabled()) { @@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer synchronized (_session) { - _messageListener.set(messageListener); _session.setHasMessageListeners(); _session.startDistpatcherIfNecessary(); @@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { - _session.startDistpatcherIfNecessary(); checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { - _session.startDistpatcherIfNecessary(); - checkPreConditions(); acquireReceiving(); + _session.startDistpatcherIfNecessary(); + try { if (closeOnAutoClose()) @@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - //This shouldn't be possible. _synchronousQueue.put(jmsMessage); } } |
