summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java94
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java61
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
4 files changed, 141 insertions, 21 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 4716f6691a..c50770d7ba 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -351,7 +351,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void unload() throws UnableToFlowMessageException
+ public void unload()
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index f73bafd3b4..623f57b224 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -24,18 +24,20 @@ import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import java.util.ArrayList;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
- private static final long MESSAGE_SIZE = 100L;
+ private static final int PRIORITIES = 3;
@Override
protected void setUp() throws Exception
- {
+ {
_arguments = new FieldTable();
- _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
super.setUp();
}
@@ -84,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
index++;
}
@@ -96,16 +97,91 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
protected AMQMessage createMessage(byte i) throws AMQException
{
AMQMessage message = super.createMessage();
-
- ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
+
+ ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i);
return message;
}
- @Override
- public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+
+ public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException
{
- //Disable this test pending completion of QPID-1637
+ int PRIORITIES = 1;
+ FieldTable arguments = new FieldTable();
+ arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
+
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ //Create a priorityQueue
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = PRIORITIES * 2;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+ {
+ sendMessage(txnContext, (msgCount % 10));
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum());
+ assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send another and ensure we are flowed
+ sendMessage(txnContext);
+
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+
+
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+ {
+ sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+ }
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
+ {
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+ }
+
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0e2b17914c..5d7fa21d56 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.qpid.server.queue;
*
*/
-import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static long MESSAGE_SIZE = 100;
+ protected static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -368,7 +367,7 @@ public class SimpleAMQQueueTest extends TestCase
long MEMORY_MAX = 500;
int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
//Set the Memory Usage to be very low
- _queue.setMemoryUsageMaximum(MEMORY_MAX);
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
{
@@ -395,7 +394,7 @@ public class SimpleAMQQueueTest extends TestCase
assertTrue("Queue has gone over quota:" + usage,
usage <= _queue.getMemoryUsageMaximum());
- assertTrue("Queue has a negative quota:" + usage,usage > 0);
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
}
assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
@@ -412,13 +411,14 @@ public class SimpleAMQQueueTest extends TestCase
}
//Ensure the messages are retreived
- assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
//Check the queue is still within it's limits.
- assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
- _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+ long current = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+ current <= _queue.getMemoryUsageMaximum());
- assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
for (int index = 0; index < MESSAGE_COUNT; index++)
{
@@ -426,10 +426,52 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage message = _subscription.getMessages().get(index);
assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
}
+ }
+
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 10;
+ int MESSAGE_COUNT = (int) MEMORY_MAX;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold all messages without flowing
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.setMemoryUsageMaximum(0L);
+
+ //Give the purger time to work
+ Thread.sleep(200);
+
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(0L , _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ }
+
+ protected void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ sendMessage(txnContext, 5);
}
- private void sendMessage(TransactionalContext txnContext) throws AMQException
+ protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
{
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
@@ -438,6 +480,7 @@ public class SimpleAMQQueueTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE;
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
msg.setContentHeaderBody(contentHeaderBody);
long messageId = msg.getMessageId();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 0c33b406e6..40961a3d2e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -48,8 +48,9 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
//This is +2 because:
// 1 - asyncDelivery Thread
- // 2 - queueInhalerThread
- assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ // 2 - queue InhalerThread
+ // 3 - queue PurgerThread
+ assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount());
queue.stop();