diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-04-14 23:04:07 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-04-14 23:04:07 +0000 |
| commit | 9568dd0e968b5946e483443b191ee6b424edc201 (patch) | |
| tree | 2fb1c17426a563d641b3f3955c6d755b88ae27bb | |
| parent | 46aa50115f938ffd7360f279183433cf36668135 (diff) | |
| download | qpid-python-9568dd0e968b5946e483443b191ee6b424edc201.tar.gz | |
Commiting the patch attached to QPID-2471
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934236 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 88 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b5ad42d8e1..175a4ffc77 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -45,6 +45,7 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; @@ -65,6 +66,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -279,7 +281,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - + /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); @@ -1471,7 +1473,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Message[" + message.toString() + "] received in session"); } _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) @@ -1500,13 +1502,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * </ul> * * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and - * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible + * receiving acknowledgment that it has then a JMSException will be thrown. In this case it will not be possible * for the client to determine whether the broker is going to recover the session or not. * * @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error. * Not that this does not necessarily mean that the recovery has failed, but simply that it is * not possible to tell if it has or not. * @todo Be aware of possible changes to parameter order as versions change. + * + * Strategy for handling recover. + * Flush any acks not yet sent. + * Stop the message flow. + * Clear the dispatch queue and the consumer queues. + * Release/Reject all messages received but not yet acknowledged. + * Start the message flow. */ public void recover() throws JMSException { @@ -1516,6 +1525,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Ensure that the session is not transacted. checkNotTransacted(); + // flush any acks we are holding in the buffer. + flushAcknowledgments(); + // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; try @@ -1527,16 +1539,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { suspendChannel(true); } - + + syncDispatchQueue(); + if (_dispatcher != null) { - _dispatcher.rollback(); + _dispatcher.recover(); } sendRecover(); - + markClean(); - + + // Set inRecovery to false before you start message flow again again. + _inRecovery = false; + if (!isSuspended) { suspendChannel(false); @@ -1550,10 +1567,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e); } + } protected abstract void sendRecover() throws AMQException, FailoverException; + protected abstract void flushAcknowledgments(); + public void rejectMessage(UnprocessedMessage message, boolean requeue) { @@ -2938,6 +2958,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } + public void recover() + { + + synchronized (_lock) + { + boolean isStopped = connectionStopped(); + + if (!isStopped) + { + setConnectionStopped(true); + } + + _dispatcherLogger.debug("Session clearing the consumer queues"); + + for (C consumer : _consumers.values()) + { + List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _unacknowledgedMessageTags.addAll(tags); + } + + setConnectionStopped(isStopped); + } + + } + + public void run() { if (_dispatcherLogger.isInfoEnabled()) @@ -3032,6 +3078,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { rejectMessage(message, true); } + else if (isInRecovery()) + { + _unacknowledgedMessageTags.add(deliveryTag); + } else { synchronized (_messageDeliveryLock) @@ -3045,7 +3095,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (updateRollbackMark(current, deliveryTag)) { _rollbackMark.compareAndSet(current, deliveryTag); - } + } } private void notifyConsumer(UnprocessedMessage message) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 71a4010d62..704dbf8bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -280,7 +280,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - void flushAcknowledgments() + protected void flushAcknowledgments() { flushAcknowledgments(false); } @@ -447,7 +447,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public void sendRecover() throws AMQException, FailoverException { - // release all unack messages + // release all unacked messages RangeSet ranges = new RangeSet(); while (true) { @@ -464,6 +464,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getCurrentException(); } + public void releaseForRollback() { getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index edcdbebba9..939e29e4d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -597,4 +597,9 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B throw new UnsupportedOperationException("The new addressing based sytanx is " + "not supported for AMQP 0-8/0-9 versions"); } + + protected void flushAcknowledgments() + { + + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index bea43cc232..a1e94aaadd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -1043,9 +1043,25 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } public void clearReceiveQueue() - { + { _synchronousQueue.clear(); } + + + public List<Long> drainReceiverQueueAndRetrieveDeliveryTags() + { + Iterator<AbstractJMSMessage> iterator = _synchronousQueue.iterator(); + List<Long> tags = new ArrayList<Long>(_synchronousQueue.size()); + + while (iterator.hasNext()) + { + + AbstractJMSMessage msg = iterator.next(); + tags.add(msg.getDeliveryTag()); + iterator.remove(); + } + return tags; + } public AMQShortString getQueuename() { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index dd8377a94a..f7a37e4894 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -183,4 +183,9 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe throw new UnsupportedOperationException("The new addressing based sytanx is " + "not supported for AMQP 0-8/0-9 versions"); } + + @Override + protected void flushAcknowledgments() + { + } } |
