summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-01-11 15:24:44 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-01-11 15:24:44 +0000
commit455cd59a5183e9e18bc47d7f0288636ecd5eec37 (patch)
tree798b639d682405588d6a3f845208226530ca6549 /java
parent01f8f7e488d44323fff9550aeefddc38bb99e0ca (diff)
downloadqpid-python-455cd59a5183e9e18bc47d7f0288636ecd5eec37.tar.gz
QPID-3604 Reverting the changes as it releases messages everytime the
channel is suspended. This results in several test failures. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1230088 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java43
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java40
3 files changed, 9 insertions, 77 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 784b75af10..48c4e3e3e6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
* to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
*/
- protected volatile boolean _usingDispatcherForCleanup;
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -3570,3 +3570,4 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 49b77dcc7b..8395c8f4b7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- synchronized (getMessageDeliveryLock())
- {
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
- Option.UNRELIABLE);
- sync();
- List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _prefetchedMessageTags.addAll(tags);
- }
- }
-
- _usingDispatcherForCleanup = true;
- syncDispatchQueue();
- _usingDispatcherForCleanup = false;
-
- RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
- RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
- RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
- + prefetched.size());
-
- for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
- {
- Range range = deliveredIter.next();
- all.add(range);
- }
-
- for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
- {
- Range range = prefetchedIter.next();
- all.add(range);
- }
-
- flushProcessed(all, false);
- getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
- getQpidSession().messageRelease(prefetched);
- sync();
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
+ }
}
else
{
@@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
}
}
+
diff --git a/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
index 5c5ad66777..c8ee61685c 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -5,14 +5,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
-import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.jms.TextMessage;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -135,41 +133,5 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
}
- /**
- * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
- * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
- * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
- * Try to receive all 10 messages.
- */
- public void testConnectionStop() throws Exception
- {
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
- Connection con = getConnection();
- con.start();
- Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
-
- MessageProducer prod = ssn.createProducer(queue);
- for (int i=0; i<10;i++)
- {
- prod.send(ssn.createTextMessage("Msg" + i));
- }
-
- MessageConsumer consumer = ssn.createConsumer(queue);
- // This is to ensure we get the first client to prefetch.
- Message msg = consumer.receive(1000);
- assertNotNull("The first consumer should get one message",msg);
- con.stop();
-
- Connection con2 = getConnection();
- con2.start();
- Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = ssn2.createConsumer(queue);
- for (int i=0; i<9;i++)
- {
- TextMessage m = (TextMessage)consumer2.receive(1000);
- assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
- }
- }
-
}
+