diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-04-03 21:21:57 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-04-03 21:21:57 +0000 |
| commit | 53d36c67f7959276d6923fb33f90f44e1a9ec095 (patch) | |
| tree | c0e20616bbd2352a02447a2a3c1e9c3f1b58b20f /qpid/java/systests/src | |
| parent | b53577acedf415b475ac3358728b909b4e3177dd (diff) | |
| download | qpid-python-53d36c67f7959276d6923fb33f90f44e1a9ec095.tar.gz | |
QPID-3927: add a systest which would highlight the underlying issue by failing to receive all messages present on the priority queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1309155 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 7cb58ff1ed..a6c9885568 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -21,8 +21,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -30,12 +32,15 @@ import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class PriorityQueueTest extends QpidBrokerTestCase { @@ -197,4 +202,99 @@ public class PriorityQueueTest extends QpidBrokerTestCase return send; } + + /** + * Test that after sending an initial message with priority 0, it is able to be repeatedly reflected back to the queue using + * default priority and then consumed again, with separate transacted sessions with prefetch 1 for producer and consumer. + * + * Highlighted defect with PriorityQueues resolved in QPID-3927. + */ + public void testMessageReflectionWithPriorityIncreaseOnTransactedSessionsWithPrefetch1() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); + Connection conn = getConnection(); + conn.start(); + assertEquals("Prefetch not reset", 1, ((AMQConnection) conn).getMaxPrefetch()); + + final Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED); + final Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED); + + //declare a priority queue with 10 priorities + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",10); + ((AMQSession<?,?>) producerSess).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments); + + Queue queue = producerSess.createQueue(getTestQueueName()); + + //create the consumer, producer, add message listener + CountDownLatch latch = new CountDownLatch(5); + MessageConsumer cons = producerSess.createConsumer(queue); + MessageProducer producer = producerSess.createProducer(queue); + + ReflectingMessageListener listener = new ReflectingMessageListener(producerSess,producer,consumerSess,latch); + cons.setMessageListener(listener); + + //Send low priority 0 message to kick start the asynchronous reflection process + producer.setPriority(0); + producer.send(nextMessage(1, true, producerSess, producer)); + producerSess.commit(); + + //wait for the reflection process to complete + assertTrue("Test process failed to complete in allowed time", latch.await(10, TimeUnit.SECONDS)); + assertNull("Unexpected throwable encountered", listener.getThrown()); + } + + private static class ReflectingMessageListener implements MessageListener + { + private Session _prodSess; + private Session _consSess; + private CountDownLatch _latch; + private MessageProducer _prod; + private long _origCount; + private Throwable _lastThrown; + + public ReflectingMessageListener(final Session prodSess, final MessageProducer prod, + final Session consSess, final CountDownLatch latch) + { + _latch = latch; + _origCount = _latch.getCount(); + _prodSess = prodSess; + _consSess = consSess; + _prod = prod; + } + + @Override + public void onMessage(final Message message) + { + try + { + _latch.countDown(); + long msgNum = _origCount - _latch.getCount(); + System.out.println("Received message " + msgNum + " with ID: " + message.getIntProperty("msg")); + + if(_latch.getCount() > 0) + { + //reflect the message, updating its ID and using default priority + message.clearProperties(); + message.setIntProperty("msg", (int) msgNum + 1); + _prod.setPriority(Message.DEFAULT_PRIORITY); + _prod.send(message); + _prodSess.commit(); + } + + //commit the consumer session to consume the message + _consSess.commit(); + } + catch(Throwable t) + { + t.printStackTrace(); + _lastThrown = t; + } + } + + public Throwable getThrown() + { + return _lastThrown; + } + } } |
