diff options
Diffstat (limited to 'java/broker/src/test')
4 files changed, 141 insertions, 21 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 4716f6691a..c50770d7ba 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -351,7 +351,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void unload() throws UnableToFlowMessageException + public void unload() { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index f73bafd3b4..623f57b224 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -24,18 +24,20 @@ import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.txn.NonTransactionalContext; import java.util.ArrayList; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { - private static final long MESSAGE_SIZE = 100L; + private static final int PRIORITIES = 3; @Override protected void setUp() throws Exception - { + { _arguments = new FieldTable(); - _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); super.setUp(); } @@ -84,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); index++; } @@ -96,16 +97,91 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest protected AMQMessage createMessage(byte i) throws AMQException { AMQMessage message = super.createMessage(); - - ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i); + + ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i); return message; } - @Override - public void testMessagesFlowToDisk() throws AMQException, InterruptedException + + public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException { - //Disable this test pending completion of QPID-1637 + int PRIORITIES = 1; + FieldTable arguments = new FieldTable(); + arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); + + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + //Create a priorityQueue + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = PRIORITIES * 2; + int MESSAGE_COUNT = (int) MEMORY_MAX * 2; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + + for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) + { + sendMessage(txnContext, (msgCount % 10)); + } + + //Check that we can hold 10 messages without flowing + assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum()); + assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send another and ensure we are flowed + sendMessage(txnContext); + + assertTrue("Queue is not flowed.", _queue.isFlowed()); + assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); + + + //send another 99 so there are 200msgs in total on the queue + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++) + { + sendMessage(txnContext); + + long usage = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + usage, + usage <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + usage, usage > 0); + + } + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.registerSubscription(_subscription, false); + + int slept = 0; + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10) + { + Thread.sleep(500); + slept++; + } + + //Ensure the messages are retreived + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + + //Check the queue is still within it's limits. + assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), + _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); + + for (int index = 0; index < MESSAGE_COUNT; index++) + { + // Ensure that we have received the messages and it wasn't flushed to disk before we received it. + AMQMessage message = _subscription.getMessages().get(index); + assertNotNull("Message:" + message.debugIdentity() + " was null.", message); + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0e2b17914c..5d7fa21d56 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.queue; * */ -import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - private static long MESSAGE_SIZE = 100; + protected static long MESSAGE_SIZE = 100; @Override protected void setUp() throws Exception @@ -368,7 +367,7 @@ public class SimpleAMQQueueTest extends TestCase long MEMORY_MAX = 500; int MESSAGE_COUNT = (int) MEMORY_MAX * 2; //Set the Memory Usage to be very low - _queue.setMemoryUsageMaximum(MEMORY_MAX); + _queue.setMemoryUsageMaximum(MEMORY_MAX); for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) { @@ -395,7 +394,7 @@ public class SimpleAMQQueueTest extends TestCase assertTrue("Queue has gone over quota:" + usage, usage <= _queue.getMemoryUsageMaximum()); - assertTrue("Queue has a negative quota:" + usage,usage > 0); + assertTrue("Queue has a negative quota:" + usage, usage > 0); } assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); @@ -412,13 +411,14 @@ public class SimpleAMQQueueTest extends TestCase } //Ensure the messages are retreived - assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); //Check the queue is still within it's limits. - assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), - _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + long current = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() , + current <= _queue.getMemoryUsageMaximum()); - assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0); + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); for (int index = 0; index < MESSAGE_COUNT; index++) { @@ -426,10 +426,52 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage message = _subscription.getMessages().get(index); assertNotNull("Message:" + message.debugIdentity() + " was null.", message); } + } + + public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException + { + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = 10; + int MESSAGE_COUNT = (int) MEMORY_MAX; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++) + { + sendMessage(txnContext); + } + + //Check that we can hold all messages without flowing + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send anothe and ensure we are flowed + sendMessage(txnContext); + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.setMemoryUsageMaximum(0L); + + //Give the purger time to work + Thread.sleep(200); + + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(0L , _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + } + + protected void sendMessage(TransactionalContext txnContext) throws AMQException + { + sendMessage(txnContext, 5); } - private void sendMessage(TransactionalContext txnContext) throws AMQException + protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException { IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog); @@ -438,6 +480,7 @@ public class SimpleAMQQueueTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority); msg.setContentHeaderBody(contentHeaderBody); long messageId = msg.getMessageId(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 0c33b406e6..40961a3d2e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -48,8 +48,9 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase //This is +2 because: // 1 - asyncDelivery Thread - // 2 - queueInhalerThread - assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount()); + // 2 - queue InhalerThread + // 3 - queue PurgerThread + assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount()); queue.stop(); |
