From 9568dd0e968b5946e483443b191ee6b424edc201 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 14 Apr 2010 23:04:07 +0000 Subject: Commiting the patch attached to QPID-2471 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934236 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 66 +++++++++++++++++++--- .../org/apache/qpid/client/AMQSession_0_10.java | 5 +- .../org/apache/qpid/client/AMQSession_0_8.java | 5 ++ .../apache/qpid/client/BasicMessageConsumer.java | 18 +++++- .../qpid/test/unit/message/TestAMQSession.java | 5 ++ 5 files changed, 88 insertions(+), 11 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5ad42d8e1..175a4ffc77 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -45,6 +45,7 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; @@ -65,6 +66,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -279,7 +281,7 @@ public abstract class AMQSession _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); @@ -1471,7 +1473,7 @@ public abstract class AMQSession * *

If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and - * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible + * receiving acknowledgment that it has then a JMSException will be thrown. In this case it will not be possible * for the client to determine whether the broker is going to recover the session or not. * * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. * Not that this does not necessarily mean that the recovery has failed, but simply that it is * not possible to tell if it has or not. * @todo Be aware of possible changes to parameter order as versions change. + * + * Strategy for handling recover. + * Flush any acks not yet sent. + * Stop the message flow. + * Clear the dispatch queue and the consumer queues. + * Release/Reject all messages received but not yet acknowledged. + * Start the message flow. */ public void recover() throws JMSException { @@ -1516,6 +1525,9 @@ public abstract class AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _unacknowledgedMessageTags.addAll(tags); + } + + setConnectionStopped(isStopped); + } + + } + + public void run() { if (_dispatcherLogger.isInfoEnabled()) @@ -3032,6 +3078,10 @@ public abstract class AMQSession extends Closeable implements Messa } public void clearReceiveQueue() - { + { _synchronousQueue.clear(); } + + + public List drainReceiverQueueAndRetrieveDeliveryTags() + { + Iterator iterator = _synchronousQueue.iterator(); + List tags = new ArrayList(_synchronousQueue.size()); + + while (iterator.hasNext()) + { + + AbstractJMSMessage msg = iterator.next(); + tags.add(msg.getDeliveryTag()); + iterator.remove(); + } + return tags; + } public AMQShortString getQueuename() { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index dd8377a94a..f7a37e4894 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -183,4 +183,9 @@ public class TestAMQSession extends AMQSession