diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-01-11 15:24:44 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-01-11 15:24:44 +0000 |
| commit | 455cd59a5183e9e18bc47d7f0288636ecd5eec37 (patch) | |
| tree | 798b639d682405588d6a3f845208226530ca6549 /java | |
| parent | 01f8f7e488d44323fff9550aeefddc38bb99e0ca (diff) | |
| download | qpid-python-455cd59a5183e9e18bc47d7f0288636ecd5eec37.tar.gz | |
QPID-3604 Reverting the changes as it releases messages everytime the
channel is suspended. This results in several test failures.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1230088 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 9 insertions, 77 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 784b75af10..48c4e3e3e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - protected volatile boolean _usingDispatcherForCleanup; + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -3570,3 +3570,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 49b77dcc7b..8395c8f4b7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - synchronized (getMessageDeliveryLock()) - { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - sync(); - List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _prefetchedMessageTags.addAll(tags); - } - } - - _usingDispatcherForCleanup = true; - syncDispatchQueue(); - _usingDispatcherForCleanup = false; - - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); - RangeSet all = RangeSetFactory.createRangeSet(delivered.size() - + prefetched.size()); - - for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) - { - Range range = deliveredIter.next(); - all.add(range); - } - - for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) - { - Range range = prefetchedIter.next(); - all.add(range); - } - - flushProcessed(all, false); - getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); - getQpidSession().messageRelease(prefetched); - sync(); + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + } } else { @@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().sync(); } } + diff --git a/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java index 5c5ad66777..c8ee61685c 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -5,14 +5,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; -import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import javax.jms.TextMessage; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -135,41 +133,5 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); } - /** - * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. - * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. - * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. - * Try to receive all 10 messages. - */ - public void testConnectionStop() throws Exception - { - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); - Connection con = getConnection(); - con.start(); - Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); - - MessageProducer prod = ssn.createProducer(queue); - for (int i=0; i<10;i++) - { - prod.send(ssn.createTextMessage("Msg" + i)); - } - - MessageConsumer consumer = ssn.createConsumer(queue); - // This is to ensure we get the first client to prefetch. - Message msg = consumer.receive(1000); - assertNotNull("The first consumer should get one message",msg); - con.stop(); - - Connection con2 = getConnection(); - con2.start(); - Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = ssn2.createConsumer(queue); - for (int i=0; i<9;i++) - { - TextMessage m = (TextMessage)consumer2.receive(1000); - assertNotNull("The second consumer should get 9 messages, but received only " + i,m); - } - } - } + |
