diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-09-18 16:12:46 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-09-18 16:12:46 +0000 |
| commit | 09f60acd6ba474bfeed068f10d966938f806ff77 (patch) | |
| tree | 6f3eaf7a84674f5f8bcb1c7b3211770d0b245907 /qpid/java/systests/src | |
| parent | b208766dcaf114eac162d6f230fb05370b01e04b (diff) | |
| download | qpid-python-09f60acd6ba474bfeed068f10d966938f806ff77.tar.gz | |
QPID-1286: make sure priority queues don't mess with deleted subscriptions
AMQPriorityQueue: don't advance deleted subscriptions
AMQPriorityQueueTest: Add test class for priority queues
SimpleAMQQueueTest: Add more tests
PriorityTest: Check for more message orders
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696686 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java | 150 |
1 files changed, 102 insertions, 48 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java index 0dbf95052f..b8e5980f26 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java @@ -52,9 +52,19 @@ public class PriorityTest extends TestCase protected final String VHOST = "/test"; protected final String QUEUE = "PriorityQueue"; - private static final int MSG_COUNT = 50; + private Context context = null; + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + + private MessageConsumer consumer; + protected void setUp() throws Exception { super.setUp(); @@ -64,7 +74,21 @@ public class PriorityTest extends TestCase TransportConnection.createVMBroker(1); } + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + context = factory.getInitialContext(env); + producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerConnection.start(); + + consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } private boolean usingInVMBroker() @@ -74,6 +98,8 @@ public class PriorityTest extends TestCase protected void tearDown() throws Exception { + producerConnection.close(); + consumerConnection.close(); if (usingInVMBroker()) { TransportConnection.killAllVMBrokers(); @@ -83,65 +109,25 @@ public class PriorityTest extends TestCase public void testPriority() throws JMSException, NamingException, AMQException { - InitialContextFactory factory = new PropertiesFileInitialContextFactory(); - - Hashtable<String, String> env = new Hashtable<String, String>(); - - env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); - env.put("queue.queue", QUEUE); - - Context context = factory.getInitialContext(env); - - Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities",10); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - - Queue queue = new AMQQueue("amq.direct",QUEUE); - + queue = new AMQQueue("amq.direct",QUEUE); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); - - - - - - - producerConnection.start(); - - - MessageProducer producer = producerSession.createProducer(queue); - - - - + producer = producerSession.createProducer(queue); for (int msg = 0; msg < MSG_COUNT; msg++) { producer.setPriority(msg % 10); producer.send(nextMessage(msg, false, producerSession, producer)); } - producer.close(); producerSession.close(); producerConnection.close(); - - Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); - - - - + consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - //Receive Message 0 - StringBuilder buf = new StringBuilder(); int receivedCount = 0; Message previous = null; int messageCount = 0; @@ -158,10 +144,78 @@ public class PriorityTest extends TestCase } assertEquals("Incorrect number of message received", 50, receivedCount); - - producerSession.close(); - producer.close(); - + } + + public void testOddOrdering() throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",3); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + // In order ABC + producer.setPriority(9); + producer.send(nextMessage(1, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(2, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(3, false, producerSession, producer)); + + // Out of order BAC + producer.setPriority(4); + producer.send(nextMessage(4, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(5, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(6, false, producerSession, producer)); + + // Out of order BCA + producer.setPriority(4); + producer.send(nextMessage(7, false, producerSession, producer)); + producer.setPriority(1); + producer.send(nextMessage(8, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(9, false, producerSession, producer)); + + // Reverse order CBA + producer.setPriority(1); + producer.send(nextMessage(10, false, producerSession, producer)); + producer.setPriority(4); + producer.send(nextMessage(11, false, producerSession, producer)); + producer.setPriority(9); + producer.send(nextMessage(12, false, producerSession, producer)); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message msg = consumer.receive(500); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(5, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(9, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(12, msg.getIntProperty("msg")); + + msg = consumer.receive(500); + assertEquals(2, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(4, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(7, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(11, msg.getIntProperty("msg")); + + msg = consumer.receive(500); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(6, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(8, msg.getIntProperty("msg")); + msg = consumer.receive(500); + assertEquals(10, msg.getIntProperty("msg")); } private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException |
