summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java155
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
2 files changed, 95 insertions, 74 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b6429972df..c436121855 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -93,8 +93,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private final FlowControllingBlockingQueue _queue;
- private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
-
private Dispatcher _dispatcher;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -136,20 +134,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private long _nextProducerId;
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
- private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
-
- /**
- * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
- */
- private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
/**
* Set when recover is called. This is to handle the case where recover() is called by application code
@@ -157,14 +141,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private boolean _inRecovery;
+ private boolean _connectionStopped;
+
private boolean _hasMessageListeners;
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
-
+
private class Dispatcher extends Thread
{
+
+ /**
+ * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
+ */
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private final Object _lock = new Object();
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -173,12 +167,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
UnprocessedMessage message;
- _stopped.set(false);
+
try
{
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
- dispatchMessage(message);
+ synchronized (_lock)
+ {
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ dispatchMessage(message);
+
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+
+ }
+
}
}
catch (InterruptedException e)
@@ -189,6 +199,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.info("Dispatcher thread terminating for channel " + _channelId);
}
+ // only call while holding lock
+ final boolean connectionStopped()
+ {
+ return _connectionStopped;
+ }
+
+ void setConnectionStopped(boolean connectionStopped)
+ {
+ synchronized (_lock)
+ {
+ _connectionStopped = connectionStopped;
+ _lock.notify();
+ }
+ }
+
private void dispatchMessage(UnprocessedMessage message)
{
if (message.getDeliverBody() != null)
@@ -246,15 +271,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- public void stopDispatcher()
+ public void close()
{
- _stopped.set(true);
+ _closed.set(true);
interrupt();
+
+ //fixme awaitTermination
+
}
}
-
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
{
@@ -285,8 +312,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_defaultPrefetchHighMark = defaultPrefetchHighMark;
_defaultPrefetchLowMark = defaultPrefetchLowMark;
- _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
-
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -446,7 +471,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
-
+
public void rollback() throws JMSException
{
checkTransacted();
@@ -654,7 +679,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -680,7 +706,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- _dispatcher.stopDispatcher();
+ _dispatcher.close();
+ _dispatcher = null;
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
@@ -712,8 +739,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// TODO: Be aware of possible changes to parameter order as versions change.
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- false)); // requeue
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ false)); // requeue
}
boolean isInRecovery()
@@ -743,37 +770,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageListener getMessageListener() throws JMSException
{
- checkNotClosed();
+// checkNotClosed();
return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
- checkNotClosed();
-
- if (!isStopped())
- {
- throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
- }
-
- // We are stopped
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- BasicMessageConsumer consumer = i.next();
-
- if (consumer.isReceiving())
- {
- throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
- }
- }
-
- _messageListener = listener;
-
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- i.next().setMessageListener(_messageListener);
- }
-
+// checkNotClosed();
+//
+// if (_dispatcher != null && !_dispatcher.connectionStopped())
+// {
+// throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+// }
+//
+// // We are stopped
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// BasicMessageConsumer consumer = i.next();
+//
+// if (consumer.isReceiving())
+// {
+// throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+// }
+// }
+//
+// _messageListener = listener;
+//
+// for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+// {
+// i.next().setMessageListener(_messageListener);
+// }
}
@@ -1582,13 +1608,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
+ //fixme This should be controlled by _stopped as it pairs with the stop method
+ //fixme or check the FlowControlledBlockingQueue _queue to see if we have flow controlled.
+ //will result in sending Flow messages for each subsequent call to flow.. only need to do this
+ // if we have called stop.
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
}
- if(hasMessageListeners() && _dispatcher == null)
+ if (hasMessageListeners())
{
startDistpatcherIfNecessary();
}
@@ -1606,7 +1636,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if(_dispatcher == null)
+ if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
@@ -1618,14 +1648,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
//stop the server delivering messages to this session
suspendChannel();
-
- //stop the dispatcher thread
- _stopped.set(true);
- }
-
- boolean isStopped()
- {
- return _stopped.get();
+ _dispatcher.setConnectionStopped(true);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 4ffec6fb41..832c312634 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -212,16 +212,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
//i.e. it is only valid to call this method if
//
- // (a) the session is stopped, in which case the dispatcher is not running
+ // (a) the connection is stopped, in which case the dispatcher is not running
// OR
// (b) the listener is null AND we are not receiving synchronously at present
//
- if (_session.isStopped())
+ if (!_session.getAMQConnection().started())
{
_messageListener.set(messageListener);
_session.setHasMessageListeners();
- _session.startDistpatcherIfNecessary();
if (_logger.isDebugEnabled())
{
@@ -248,7 +247,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
synchronized (_session)
{
-
_messageListener.set(messageListener);
_session.setHasMessageListeners();
_session.startDistpatcherIfNecessary();
@@ -329,12 +327,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
- _session.startDistpatcherIfNecessary();
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -385,12 +384,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
- _session.startDistpatcherIfNecessary();
-
checkPreConditions();
acquireReceiving();
+ _session.startDistpatcherIfNecessary();
+
try
{
if (closeOnAutoClose())
@@ -560,7 +559,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- //This shouldn't be possible.
_synchronousQueue.put(jmsMessage);
}
}