summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-17 10:20:24 +0000
committerKeith Wall <kwall@apache.org>2011-11-17 10:20:24 +0000
commit3214ce92d4bf7dcb922214c1b477e50fa196a7e0 (patch)
tree013309b4aebc6b6cf2bc6f89a7388157d0c379af /java/client
parent4ebfdc1abe47c5f437ca8516e1793e8b83ae8ed1 (diff)
downloadqpid-python-3214ce92d4bf7dcb922214c1b477e50fa196a7e0.tar.gz
QPID-2703: 0-10 Transaction rollback/recover does not restore consumer credit
Defect in Java client. 0-10 requires that commands are completed, but the Java client was failing to complete those commands corresponding to messages that were being rolled-back/recovered. Work by Robbie Gemmell and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1203139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java23
3 files changed, 17 insertions, 30 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index c625849694..7bde470c8e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -49,7 +49,6 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -143,9 +142,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private int unackedCount = 0;
/**
- * USed to store the range of in tx messages
+ * Used to store the range of in tx messages
*/
- private RangeSet _txRangeSet = new RangeSet();
+ private final RangeSet _txRangeSet = new RangeSet();
private int _txSize = 0;
//--- constructors
@@ -459,6 +458,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
// release all unacked messages
RangeSet ranges = gatherUnackedRangeSet();
+ flushProcessed(ranges, false);
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
sync();
@@ -481,12 +481,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return ranges;
}
-
public void releaseForRollback()
{
- getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
- _txRangeSet.clear();
- _txSize = 0;
+ if (_txSize > 0)
+ {
+ flushProcessed(_txRangeSet, false);
+ getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
+ _txRangeSet.clear();
+ _txSize = 0;
+ }
}
/**
@@ -500,6 +503,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// The value of requeue is always true
RangeSet ranges = new RangeSet();
ranges.add((int) deliveryTag);
+ flushProcessed(ranges, false);
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
//I don't think we need to sync
}
@@ -1334,6 +1338,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// messages sent by the brokers following the first rollback
// after failover
_highestDeliveryTag.set(-1);
+ // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
+ //messages that came from the old broker.
+ _txRangeSet.clear();
+ _txSize = 0;
+ _unacknowledgedMessageTags.clear();
super.resubscribe();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 47c20b683c..57434c9a1d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -470,6 +470,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
+ _0_10session.flushProcessed(ranges, false);
_0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
clearReceiveQueue();
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 68531eee84..f53fa8d83c 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -287,29 +287,6 @@ public class AMQSession_0_10Test extends TestCase
assertNotNull("ExecutionSync was not sent", event);
}
- public void testRejectMessage()
- {
- AMQSession_0_10 session = createAMQSession_0_10();
- session.rejectMessage(1l, true);
- ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
- assertNotNull("MessageRelease event was not sent", event);
- }
-
- public void testReleaseForRollback()
- {
- AMQSession_0_10 session = createAMQSession_0_10();
- try
- {
- session.releaseForRollback();
- }
- catch (Exception e)
- {
- fail("Unexpected exception is cought:" + e.getMessage());
- }
- ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false);
- assertNotNull("MessageRelease event was not sent", event);
- }
-
public void testSendQueueDelete()
{
AMQSession_0_10 session = createAMQSession_0_10();