diff options
Diffstat (limited to 'java/broker/src/test')
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java | 25 | ||||
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 14 |
2 files changed, 26 insertions, 13 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 623f57b224..800bb1ac9c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -134,17 +134,21 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest assertTrue("Queue is flowed.", !_queue.isFlowed()); // Send another and ensure we are flowed - sendMessage(txnContext); - + sendMessage(txnContext, 9); + + //Give the Purging Thread a chance to run + Thread.yield(); + Thread.sleep(500); + assertTrue("Queue is not flowed.", _queue.isFlowed()); - assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); - assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); + assertEquals("Queue contains more messages than expected.", MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals("Queue over memory quota.",MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); - //send another 99 so there are 200msgs in total on the queue - for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++) + //send another batch of messagse so the total in each queue is equal + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) ; msgCount++) { - sendMessage(txnContext); + sendMessage(txnContext, (msgCount % 10)); long usage = _queue.getMemoryUsageCurrent(); assertTrue("Queue has gone over quota:" + usage, @@ -153,21 +157,22 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest assertTrue("Queue has a negative quota:" + usage, usage > 0); } - assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); assertTrue("Queue is not flowed.", _queue.isFlowed()); _queue.registerSubscription(_subscription, false); int slept = 0; - while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10) + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT + 1 && slept < 10) { + Thread.yield(); Thread.sleep(500); slept++; } //Ensure the messages are retreived - assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT + 1, _subscription.getQueueEntries().size()); //Check the queue is still within it's limits. assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 5d7fa21d56..86d1948f20 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -434,7 +434,9 @@ public class SimpleAMQQueueTest extends TestCase NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); MESSAGE_SIZE = 1; - long MEMORY_MAX = 10; + /** Set to larger than the purge batch size. Default 100. + * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */ + long MEMORY_MAX = 500; int MESSAGE_COUNT = (int) MEMORY_MAX; //Set the Memory Usage to be very low _queue.setMemoryUsageMaximum(MEMORY_MAX); @@ -457,8 +459,14 @@ public class SimpleAMQQueueTest extends TestCase _queue.setMemoryUsageMaximum(0L); - //Give the purger time to work - Thread.sleep(200); + //Give the purger time to work maximum of 1s + int slept = 0; + while (_queue.getMemoryUsageCurrent() > 0 && slept < 5) + { + Thread.yield(); + Thread.sleep(200); + slept++; + } assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); assertEquals(0L , _queue.getMemoryUsageCurrent()); |
