diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-03-30 16:01:23 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-03-30 16:01:23 +0000 |
| commit | 34e71040bd3d94549d6ad100db2e7f481e840cbe (patch) | |
| tree | bed48cfdf43b33108164d0fad33f085fbdf0f008 /java/broker/src/test | |
| parent | c6a894d5e9f86527443086ae67d7dff668f5d800 (diff) | |
| download | qpid-python-34e71040bd3d94549d6ad100db2e7f481e840cbe.tar.gz | |
QPID-3167: add a unit test of SimpleAMQQueue#processQueue to check delivery when subscriptions with unique selectors are in use
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1087000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
11 files changed, 152 insertions, 13 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7b58966a4c..9e831b2a8e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase static ContentHeaderBody getContentHeader(FieldTable headers) { ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); + header.setProperties(getProperties(headers)); return header; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index f72961c03c..403a290a0f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -396,7 +396,7 @@ public class TopicExchangeTest extends InternalBrokerBaseCase IncomingMessage message = new IncomingMessage(info); final ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - chb.properties = props; + chb.setProperties(props); message.setContentHeaderBody(chb); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index d52f4c03f3..3961b3b355 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -96,7 +96,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest AMQMessage msg = super.createMessage(id); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); props.setPriority(i); - msg.getContentHeaderBody().properties = props; + msg.getContentHeaderBody().setProperties(props); return msg; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 0707cab3d5..a8bddcf6bf 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - contentHeaderBody.properties = props; + contentHeaderBody.setProperties(props); contentHeaderBody.bodySize = size; // in bytes IncomingMessage message = new IncomingMessage(publish); message.setContentHeaderBody(contentHeaderBody); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 5b72cfac40..365353e734 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -402,8 +402,8 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); + contentHeaderBody.setProperties(new BasicContentHeaderProperties()); + ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1)); IncomingMessage msg = new IncomingMessage(publish); msg.setContentHeaderBody(contentHeaderBody); return msg; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 04608275a3..0f5374b3e5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -126,7 +126,7 @@ public class AckTest extends InternalBrokerBaseCase //IncomingMessage msg2 = null; BasicContentHeaderProperties b = new BasicContentHeaderProperties(); ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; + cb.setProperties(b); if (persistent) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 67d093d00a..41ca751684 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -660,8 +660,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase // Create IncomingMessage and nondurable queue final IncomingMessage msg = new IncomingMessage(info); ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.properties = new BasicContentHeaderProperties(); - ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + contentHeaderBody.setProperties(new BasicContentHeaderProperties()); + ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2); msg.setContentHeaderBody(contentHeaderBody); final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>(); @@ -707,6 +707,111 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } + /** + * processQueue() is used when asynchronously delivering messages to + * subscriptions which could not be delivered immediately during the + * enqueue() operation. + * + * A defect within the method would mean that delivery of these messages may + * not occur should the Runner stop before all messages have been processed. + * Such a defect was discovered when Selectors were used such that one and + * only one subscription can/will accept any given messages, but multiple + * subscriptions are present, and one of the earlier subscriptions receives + * more messages than the others. + * + * This test is to validate that the processQueue() method is able to + * correctly deliver all of the messages present for asynchronous delivery + * to subscriptions in such a scenario. + */ + public void testProcessQueueWithUniqueSelectors() throws Exception + { + TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); + SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false, + false, _virtualHost, factory, null) + { + @Override + public void deliverAsync(Subscription sub) + { + // do nothing, i.e prevent deliveries by the SubFlushRunner + // when registering the new subscriptions + } + }; + + // retrieve the QueueEntryList the queue creates and insert the test + // messages, thus avoiding straight-through delivery attempts during + //enqueue() process. + QueueEntryList list = factory.getQueueEntryList(); + assertNotNull("QueueEntryList should have been created", list); + + QueueEntry msg1 = list.add(createMessage(1L)); + QueueEntry msg2 = list.add(createMessage(2L)); + QueueEntry msg3 = list.add(createMessage(3L)); + QueueEntry msg4 = list.add(createMessage(4L)); + QueueEntry msg5 = list.add(createMessage(5L)); + + // Create lists of the entries each subscription should be interested + // in.Bias over 50% of the messages to the first subscription so that + // the later subscriptions reject them and report being done before + // the first subscription as the processQueue method proceeds. + List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3); + List<QueueEntry> msgListSub2 = createEntriesList(msg4); + List<QueueEntry> msgListSub3 = createEntriesList(msg5); + + MockSubscription sub1 = new MockSubscription(msgListSub1); + MockSubscription sub2 = new MockSubscription(msgListSub2); + MockSubscription sub3 = new MockSubscription(msgListSub3); + + // register the subscriptions + testQueue.registerSubscription(sub1, false); + testQueue.registerSubscription(sub2, false); + testQueue.registerSubscription(sub3, false); + + //check that no messages have been delivered to the + //subscriptions during registration + assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); + + // call processQueue to deliver the messages + testQueue.processQueue(new QueueRunner(testQueue, 1) + { + @Override + public void run() + { + // we dont actually want/need this runner to do any work + // because we we are already doing it! + } + }); + + // check expected messages delivered to correct consumers + verifyRecievedMessages(msgListSub1, sub1.getMessages()); + verifyRecievedMessages(msgListSub2, sub2.getMessages()); + verifyRecievedMessages(msgListSub3, sub3.getMessages()); + } + + private List<QueueEntry> createEntriesList(QueueEntry... entries) + { + ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>(); + for (QueueEntry entry : entries) + { + entriesList.add(entry); + } + return entriesList; + } + + private void verifyRecievedMessages(List<QueueEntry> expected, + List<QueueEntry> delivered) + { + assertEquals("Consumer did not receive the expected number of messages", + expected.size(), delivered.size()); + + for (QueueEntry msg : expected) + { + assertTrue("Consumer did not recieve msg: " + + msg.getMessage().getMessageNumber(), delivered.contains(msg)); + } + } + public class TestMessage extends AMQMessage { private final long _tag; @@ -747,4 +852,20 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase AMQMessage messageA = new TestMessage(id, id, info); return messageA; } + + class TestSimpleQueueEntryListFactory implements QueueEntryListFactory + { + QueueEntryList _list; + + public QueueEntryList createQueueEntryList(AMQQueue queue) + { + _list = new SimpleQueueEntryList(queue); + return _list; + } + + public QueueEntryList getQueueEntryList() + { + return _list; + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 3ebe631f62..62ceb68208 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -589,7 +589,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase headerBody.classId = BasicConsumeBodyImpl.CLASS_ID; headerBody.bodySize = 0; - headerBody.properties = properties; + headerBody.setProperties(properties); try { diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java index a75cbe8662..2d41eb9899 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java @@ -102,7 +102,7 @@ public class ReferenceCountingTest extends QpidTestCase ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); bchp.setDeliveryMode((byte)2); - chb.properties = bchp; + chb.setProperties(bchp); return chb; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 1ec134e90e..6fbc627d8c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription; */ import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,6 +46,7 @@ public class MockSubscription implements Subscription private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); + private List<QueueEntry> _acceptEntries = null; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); @@ -54,6 +56,15 @@ public class MockSubscription implements Subscription // Create a simple ID that increments for ever new Subscription private final long _subscriptionID = idGenerator.getAndIncrement(); + public MockSubscription() + { + } + + public MockSubscription(List<QueueEntry> acceptEntries) + { + _acceptEntries = acceptEntries; + } + public void close() { _closed = true; @@ -119,8 +130,15 @@ public class MockSubscription implements Subscription _stateChangeLock.lock(); } - public boolean hasInterest(QueueEntry msg) + public boolean hasInterest(QueueEntry entry) { + if(_acceptEntries != null) + { + //simulate selector behaviour, only signal + //interest in the dictated queue entries + return _acceptEntries.contains(entry); + } + return true; } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 925b161118..ff94942457 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -243,7 +243,7 @@ public class InternalBrokerBaseCase extends QpidTestCase //Make Message Persistent properties.setDeliveryMode((byte) 2); - _headerBody.properties = properties; + _headerBody.setProperties(properties); channel.publishContentHeader(_headerBody); } |
