diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-17 10:20:24 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-17 10:20:24 +0000 |
| commit | 3214ce92d4bf7dcb922214c1b477e50fa196a7e0 (patch) | |
| tree | 013309b4aebc6b6cf2bc6f89a7388157d0c379af /java/client | |
| parent | 4ebfdc1abe47c5f437ca8516e1793e8b83ae8ed1 (diff) | |
| download | qpid-python-3214ce92d4bf7dcb922214c1b477e50fa196a7e0.tar.gz | |
QPID-2703: 0-10 Transaction rollback/recover does not restore consumer credit
Defect in Java client. 0-10 requires that commands are completed, but the Java client was failing to complete those commands
corresponding to messages that were being rolled-back/recovered. Work by Robbie Gemmell and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1203139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
3 files changed, 17 insertions, 30 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index c625849694..7bde470c8e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -49,7 +49,6 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -143,9 +142,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private int unackedCount = 0; /** - * USed to store the range of in tx messages + * Used to store the range of in tx messages */ - private RangeSet _txRangeSet = new RangeSet(); + private final RangeSet _txRangeSet = new RangeSet(); private int _txSize = 0; //--- constructors @@ -459,6 +458,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // release all unacked messages RangeSet ranges = gatherUnackedRangeSet(); + flushProcessed(ranges, false); getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. sync(); @@ -481,12 +481,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return ranges; } - public void releaseForRollback() { - getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); - _txRangeSet.clear(); - _txSize = 0; + if (_txSize > 0) + { + flushProcessed(_txRangeSet, false); + getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); + _txRangeSet.clear(); + _txSize = 0; + } } /** @@ -500,6 +503,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // The value of requeue is always true RangeSet ranges = new RangeSet(); ranges.add((int) deliveryTag); + flushProcessed(ranges, false); getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); //I don't think we need to sync } @@ -1334,6 +1338,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // messages sent by the brokers following the first rollback // after failover _highestDeliveryTag.set(-1); + // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to + //messages that came from the old broker. + _txRangeSet.clear(); + _txSize = 0; + _unacknowledgedMessageTags.clear(); super.resubscribe(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 47c20b683c..57434c9a1d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -470,6 +470,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } + _0_10session.flushProcessed(ranges, false); _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); clearReceiveQueue(); } diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 68531eee84..f53fa8d83c 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -287,29 +287,6 @@ public class AMQSession_0_10Test extends TestCase assertNotNull("ExecutionSync was not sent", event); } - public void testRejectMessage() - { - AMQSession_0_10 session = createAMQSession_0_10(); - session.rejectMessage(1l, true); - ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); - assertNotNull("MessageRelease event was not sent", event); - } - - public void testReleaseForRollback() - { - AMQSession_0_10 session = createAMQSession_0_10(); - try - { - session.releaseForRollback(); - } - catch (Exception e) - { - fail("Unexpected exception is cought:" + e.getMessage()); - } - ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); - assertNotNull("MessageRelease event was not sent", event); - } - public void testSendQueueDelete() { AMQSession_0_10 session = createAMQSession_0_10(); |
