summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2009-12-08 04:05:04 +0000
committerRobert Gemmell <robbie@apache.org>2009-12-08 04:05:04 +0000
commit10adf40dcbe9378662b365de0ac57c57296d026b (patch)
treec9853783adce0af133eba2881879f317298045dd /java/broker/src
parentcb3edd774a322d0b62776ff25f519ecfb08bd77f (diff)
downloadqpid-python-10adf40dcbe9378662b365de0ac57c57296d026b.tar.gz
QPID-2177: unit and system testing for the new flow controlled related attributes of the Queue MBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@888250 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java61
1 files changed, 61 insertions, 0 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 3bb8d397be..d6bdacee86 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -318,6 +318,67 @@ public class AMQQueueMBeanTest extends TestCase
}
}
+
+ public void testFlowControlProperties() throws Exception
+ {
+ assertTrue(_queueMBean.getCapacity() == 0);
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+ assertFalse(_queueMBean.isFlowOverfull());
+
+ //capacity currently 0, try setting FlowResumeCapacity above this
+ try
+ {
+ _queueMBean.setFlowResumeCapacity(1L);
+ fail("Should have failed to allow setting FlowResumeCapacity above Capacity");
+ }
+ catch (IllegalArgumentException ex)
+ {
+ //expected exception
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+ }
+
+ //(FlowResume)Capacity currently 0, set both to 2 then try setting Capacity below this
+ _queueMBean.setCapacity(2L);
+ assertTrue(_queueMBean.getCapacity() == 2L);
+ _queueMBean.setFlowResumeCapacity(2L);
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 2L);
+
+ try
+ {
+ _queueMBean.setCapacity(1L);
+ fail("Should have failed to allow setting Capacity below FlowResumeCapacity");
+ }
+ catch (IllegalArgumentException ex)
+ {
+ //expected exception
+ assertTrue(_queueMBean.getCapacity() == 2);
+ }
+
+ //set (FlowResume)Capacity to MESSAGE_SIZE +1 then add a message to the queue
+ _queueMBean.setCapacity(MESSAGE_SIZE + 1);
+ _queueMBean.setFlowResumeCapacity(MESSAGE_SIZE + 1);
+
+ AMQChannel channel = new AMQChannel(_protocolSession, 1, _messageStore);
+ sendMessages(1, true);
+ _queue.checkCapacity(channel);
+
+ assertFalse(_queueMBean.isFlowOverfull());
+ assertFalse(channel.getBlocking());
+
+ //add another message then check queue is now overfull and channel blocked
+ sendMessages(1, true);
+ _queue.checkCapacity(channel);
+
+ assertTrue(_queueMBean.isFlowOverfull());
+ assertTrue(channel.getBlocking());
+
+ //set FlowResumeCapacity to 2x MESSAGE_SIZE and check queue is now underfull and channel unblocked
+ _queueMBean.setCapacity(2 * MESSAGE_SIZE);//must increase capacity too
+ _queueMBean.setFlowResumeCapacity(2 * MESSAGE_SIZE);
+
+ assertFalse(_queueMBean.isFlowOverfull());
+ assertFalse(channel.getBlocking());
+ }
private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
{