summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-04-03 21:21:57 +0000
committerRobert Gemmell <robbie@apache.org>2012-04-03 21:21:57 +0000
commit53d36c67f7959276d6923fb33f90f44e1a9ec095 (patch)
treec0e20616bbd2352a02447a2a3c1e9c3f1b58b20f /qpid/java/systests/src
parentb53577acedf415b475ac3358728b909b4e3177dd (diff)
downloadqpid-python-53d36c67f7959276d6923fb33f90f44e1a9ec095.tar.gz
QPID-3927: add a systest which would highlight the underlying issue by failing to receive all messages present on the priority queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1309155 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java100
1 files changed, 100 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index 7cb58ff1ed..a6c9885568 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -21,8 +21,10 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -30,12 +32,15 @@ import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class PriorityQueueTest extends QpidBrokerTestCase
{
@@ -197,4 +202,99 @@ public class PriorityQueueTest extends QpidBrokerTestCase
return send;
}
+
+ /**
+ * Test that after sending an initial message with priority 0, it is able to be repeatedly reflected back to the queue using
+ * default priority and then consumed again, with separate transacted sessions with prefetch 1 for producer and consumer.
+ *
+ * Highlighted defect with PriorityQueues resolved in QPID-3927.
+ */
+ public void testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1");
+ Connection conn = getConnection();
+ conn.start();
+ assertEquals("Prefetch not reset", 1, ((AMQConnection) conn).getMaxPrefetch());
+
+ final Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ final Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ //declare a priority queue with 10 priorities
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",10);
+ ((AMQSession<?,?>) producerSess).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments);
+
+ Queue queue = producerSess.createQueue(getTestQueueName());
+
+ //create the consumer, producer, add message listener
+ CountDownLatch latch = new CountDownLatch(5);
+ MessageConsumer cons = producerSess.createConsumer(queue);
+ MessageProducer producer = producerSess.createProducer(queue);
+
+ ReflectingMessageListener listener = new ReflectingMessageListener(producerSess,producer,consumerSess,latch);
+ cons.setMessageListener(listener);
+
+ //Send low priority 0 message to kick start the asynchronous reflection process
+ producer.setPriority(0);
+ producer.send(nextMessage(1, true, producerSess, producer));
+ producerSess.commit();
+
+ //wait for the reflection process to complete
+ assertTrue("Test process failed to complete in allowed time", latch.await(10, TimeUnit.SECONDS));
+ assertNull("Unexpected throwable encountered", listener.getThrown());
+ }
+
+ private static class ReflectingMessageListener implements MessageListener
+ {
+ private Session _prodSess;
+ private Session _consSess;
+ private CountDownLatch _latch;
+ private MessageProducer _prod;
+ private long _origCount;
+ private Throwable _lastThrown;
+
+ public ReflectingMessageListener(final Session prodSess, final MessageProducer prod,
+ final Session consSess, final CountDownLatch latch)
+ {
+ _latch = latch;
+ _origCount = _latch.getCount();
+ _prodSess = prodSess;
+ _consSess = consSess;
+ _prod = prod;
+ }
+
+ @Override
+ public void onMessage(final Message message)
+ {
+ try
+ {
+ _latch.countDown();
+ long msgNum = _origCount - _latch.getCount();
+ System.out.println("Received message " + msgNum + " with ID: " + message.getIntProperty("msg"));
+
+ if(_latch.getCount() > 0)
+ {
+ //reflect the message, updating its ID and using default priority
+ message.clearProperties();
+ message.setIntProperty("msg", (int) msgNum + 1);
+ _prod.setPriority(Message.DEFAULT_PRIORITY);
+ _prod.send(message);
+ _prodSess.commit();
+ }
+
+ //commit the consumer session to consume the message
+ _consSess.commit();
+ }
+ catch(Throwable t)
+ {
+ t.printStackTrace();
+ _lastThrown = t;
+ }
+ }
+
+ public Throwable getThrown()
+ {
+ return _lastThrown;
+ }
+ }
}