summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-04-14 23:04:07 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-04-14 23:04:07 +0000
commit9568dd0e968b5946e483443b191ee6b424edc201 (patch)
tree2fb1c17426a563d641b3f3955c6d755b88ae27bb
parent46aa50115f938ffd7360f279183433cf36668135 (diff)
downloadqpid-python-9568dd0e968b5946e483443b191ee6b424edc201.tar.gz
Commiting the patch attached to QPID-2471
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934236 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java66
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java18
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java5
5 files changed, 88 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b5ad42d8e1..175a4ffc77 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -45,6 +45,7 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
@@ -65,6 +66,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -279,7 +281,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
-
+
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
@@ -1471,7 +1473,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_logger.debug("Message[" + message.toString() + "] received in session");
}
_highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
@@ -1500,13 +1502,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* </ul>
*
* <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
- * receiving acknolwedgement that it hasm then a JMSException will be thrown. In this case it will not be possible
+ * receiving acknowledgment that it has then a JMSException will be thrown. In this case it will not be possible
* for the client to determine whether the broker is going to recover the session or not.
*
* @throws JMSException If the JMS provider fails to stop and restart message delivery due to some internal error.
* Not that this does not necessarily mean that the recovery has failed, but simply that it is
* not possible to tell if it has or not.
* @todo Be aware of possible changes to parameter order as versions change.
+ *
+ * Strategy for handling recover.
+ * Flush any acks not yet sent.
+ * Stop the message flow.
+ * Clear the dispatch queue and the consumer queues.
+ * Release/Reject all messages received but not yet acknowledged.
+ * Start the message flow.
*/
public void recover() throws JMSException
{
@@ -1516,6 +1525,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Ensure that the session is not transacted.
checkNotTransacted();
+ // flush any acks we are holding in the buffer.
+ flushAcknowledgments();
+
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
try
@@ -1527,16 +1539,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
suspendChannel(true);
}
-
+
+ syncDispatchQueue();
+
if (_dispatcher != null)
{
- _dispatcher.rollback();
+ _dispatcher.recover();
}
sendRecover();
-
+
markClean();
-
+
+ // Set inRecovery to false before you start message flow again again.
+ _inRecovery = false;
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1550,10 +1567,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
throw new JMSAMQException("Recovery was interrupted by fail-over. Recovery status is not known.", e);
}
+
}
protected abstract void sendRecover() throws AMQException, FailoverException;
+ protected abstract void flushAcknowledgments();
+
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -2938,6 +2958,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
+ public void recover()
+ {
+
+ synchronized (_lock)
+ {
+ boolean isStopped = connectionStopped();
+
+ if (!isStopped)
+ {
+ setConnectionStopped(true);
+ }
+
+ _dispatcherLogger.debug("Session clearing the consumer queues");
+
+ for (C consumer : _consumers.values())
+ {
+ List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+ _unacknowledgedMessageTags.addAll(tags);
+ }
+
+ setConnectionStopped(isStopped);
+ }
+
+ }
+
+
public void run()
{
if (_dispatcherLogger.isInfoEnabled())
@@ -3032,6 +3078,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
rejectMessage(message, true);
}
+ else if (isInRecovery())
+ {
+ _unacknowledgedMessageTags.add(deliveryTag);
+ }
else
{
synchronized (_messageDeliveryLock)
@@ -3045,7 +3095,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (updateRollbackMark(current, deliveryTag))
{
_rollbackMark.compareAndSet(current, deliveryTag);
- }
+ }
}
private void notifyConsumer(UnprocessedMessage message)
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 71a4010d62..704dbf8bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -280,7 +280,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- void flushAcknowledgments()
+ protected void flushAcknowledgments()
{
flushAcknowledgments(false);
}
@@ -447,7 +447,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendRecover() throws AMQException, FailoverException
{
- // release all unack messages
+ // release all unacked messages
RangeSet ranges = new RangeSet();
while (true)
{
@@ -464,6 +464,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getCurrentException();
}
+
public void releaseForRollback()
{
getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index edcdbebba9..939e29e4d5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -597,4 +597,9 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
throw new UnsupportedOperationException("The new addressing based sytanx is "
+ "not supported for AMQP 0-8/0-9 versions");
}
+
+ protected void flushAcknowledgments()
+ {
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index bea43cc232..a1e94aaadd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -1043,9 +1043,25 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
public void clearReceiveQueue()
- {
+ {
_synchronousQueue.clear();
}
+
+
+ public List<Long> drainReceiverQueueAndRetrieveDeliveryTags()
+ {
+ Iterator<AbstractJMSMessage> iterator = _synchronousQueue.iterator();
+ List<Long> tags = new ArrayList<Long>(_synchronousQueue.size());
+
+ while (iterator.hasNext())
+ {
+
+ AbstractJMSMessage msg = iterator.next();
+ tags.add(msg.getDeliveryTag());
+ iterator.remove();
+ }
+ return tags;
+ }
public AMQShortString getQueuename()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index dd8377a94a..f7a37e4894 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -183,4 +183,9 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
throw new UnsupportedOperationException("The new addressing based sytanx is "
+ "not supported for AMQP 0-8/0-9 versions");
}
+
+ @Override
+ protected void flushAcknowledgments()
+ {
+ }
}