summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-09-18 16:12:46 +0000
committerAidan Skinner <aidan@apache.org>2008-09-18 16:12:46 +0000
commit09f60acd6ba474bfeed068f10d966938f806ff77 (patch)
tree6f3eaf7a84674f5f8bcb1c7b3211770d0b245907 /qpid/java/systests/src
parentb208766dcaf114eac162d6f230fb05370b01e04b (diff)
downloadqpid-python-09f60acd6ba474bfeed068f10d966938f806ff77.tar.gz
QPID-1286: make sure priority queues don't mess with deleted subscriptions
AMQPriorityQueue: don't advance deleted subscriptions AMQPriorityQueueTest: Add test class for priority queues SimpleAMQQueueTest: Add more tests PriorityTest: Check for more message orders git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696686 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java150
1 files changed, 102 insertions, 48 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
index 0dbf95052f..b8e5980f26 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
@@ -52,9 +52,19 @@ public class PriorityTest extends TestCase
protected final String VHOST = "/test";
protected final String QUEUE = "PriorityQueue";
-
private static final int MSG_COUNT = 50;
+ private Context context = null;
+ private Connection producerConnection;
+ private MessageProducer producer;
+ private Session producerSession;
+ private Queue queue;
+ private Connection consumerConnection;
+ private Session consumerSession;
+
+
+ private MessageConsumer consumer;
+
protected void setUp() throws Exception
{
super.setUp();
@@ -64,7 +74,21 @@ public class PriorityTest extends TestCase
TransportConnection.createVMBroker(1);
}
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+ env.put("queue.queue", QUEUE);
+
+ context = factory.getInitialContext(env);
+ producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producerConnection.start();
+
+ consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
+ consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
}
private boolean usingInVMBroker()
@@ -74,6 +98,8 @@ public class PriorityTest extends TestCase
protected void tearDown() throws Exception
{
+ producerConnection.close();
+ consumerConnection.close();
if (usingInVMBroker())
{
TransportConnection.killAllVMBrokers();
@@ -83,65 +109,25 @@ public class PriorityTest extends TestCase
public void testPriority() throws JMSException, NamingException, AMQException
{
- InitialContextFactory factory = new PropertiesFileInitialContextFactory();
-
- Hashtable<String, String> env = new Hashtable<String, String>();
-
- env.put("connectionfactory.connection", "amqp://guest:guest@PRIORITY_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
-
- Context context = factory.getInitialContext(env);
-
- Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",10);
-
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
-
- Queue queue = new AMQQueue("amq.direct",QUEUE);
-
+ queue = new AMQQueue("amq.direct",QUEUE);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-
-
-
-
-
-
- producerConnection.start();
-
-
- MessageProducer producer = producerSession.createProducer(queue);
-
-
-
-
+ producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
producer.setPriority(msg % 10);
producer.send(nextMessage(msg, false, producerSession, producer));
}
-
producer.close();
producerSession.close();
producerConnection.close();
-
- Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
-
-
-
-
+ consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
-
Message received;
- //Receive Message 0
- StringBuilder buf = new StringBuilder();
int receivedCount = 0;
Message previous = null;
int messageCount = 0;
@@ -158,10 +144,78 @@ public class PriorityTest extends TestCase
}
assertEquals("Incorrect number of message received", 50, receivedCount);
-
- producerSession.close();
- producer.close();
-
+ }
+
+ public void testOddOrdering() throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-priorities",3);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ // In order ABC
+ producer.setPriority(9);
+ producer.send(nextMessage(1, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(2, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(3, false, producerSession, producer));
+
+ // Out of order BAC
+ producer.setPriority(4);
+ producer.send(nextMessage(4, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(5, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(6, false, producerSession, producer));
+
+ // Out of order BCA
+ producer.setPriority(4);
+ producer.send(nextMessage(7, false, producerSession, producer));
+ producer.setPriority(1);
+ producer.send(nextMessage(8, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(9, false, producerSession, producer));
+
+ // Reverse order CBA
+ producer.setPriority(1);
+ producer.send(nextMessage(10, false, producerSession, producer));
+ producer.setPriority(4);
+ producer.send(nextMessage(11, false, producerSession, producer));
+ producer.setPriority(9);
+ producer.send(nextMessage(12, false, producerSession, producer));
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message msg = consumer.receive(500);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(5, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(9, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(12, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(500);
+ assertEquals(2, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(4, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(7, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(11, msg.getIntProperty("msg"));
+
+ msg = consumer.receive(500);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(6, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(8, msg.getIntProperty("msg"));
+ msg = consumer.receive(500);
+ assertEquals(10, msg.getIntProperty("msg"));
}
private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException