diff options
Diffstat (limited to 'java/broker/src')
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 3bb8d397be..d6bdacee86 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -318,6 +318,67 @@ public class AMQQueueMBeanTest extends TestCase } } + + public void testFlowControlProperties() throws Exception + { + assertTrue(_queueMBean.getCapacity() == 0); + assertTrue(_queueMBean.getFlowResumeCapacity() == 0); + assertFalse(_queueMBean.isFlowOverfull()); + + //capacity currently 0, try setting FlowResumeCapacity above this + try + { + _queueMBean.setFlowResumeCapacity(1L); + fail("Should have failed to allow setting FlowResumeCapacity above Capacity"); + } + catch (IllegalArgumentException ex) + { + //expected exception + assertTrue(_queueMBean.getFlowResumeCapacity() == 0); + } + + //(FlowResume)Capacity currently 0, set both to 2 then try setting Capacity below this + _queueMBean.setCapacity(2L); + assertTrue(_queueMBean.getCapacity() == 2L); + _queueMBean.setFlowResumeCapacity(2L); + assertTrue(_queueMBean.getFlowResumeCapacity() == 2L); + + try + { + _queueMBean.setCapacity(1L); + fail("Should have failed to allow setting Capacity below FlowResumeCapacity"); + } + catch (IllegalArgumentException ex) + { + //expected exception + assertTrue(_queueMBean.getCapacity() == 2); + } + + //set (FlowResume)Capacity to MESSAGE_SIZE +1 then add a message to the queue + _queueMBean.setCapacity(MESSAGE_SIZE + 1); + _queueMBean.setFlowResumeCapacity(MESSAGE_SIZE + 1); + + AMQChannel channel = new AMQChannel(_protocolSession, 1, _messageStore); + sendMessages(1, true); + _queue.checkCapacity(channel); + + assertFalse(_queueMBean.isFlowOverfull()); + assertFalse(channel.getBlocking()); + + //add another message then check queue is now overfull and channel blocked + sendMessages(1, true); + _queue.checkCapacity(channel); + + assertTrue(_queueMBean.isFlowOverfull()); + assertTrue(channel.getBlocking()); + + //set FlowResumeCapacity to 2x MESSAGE_SIZE and check queue is now underfull and channel unblocked + _queueMBean.setCapacity(2 * MESSAGE_SIZE);//must increase capacity too + _queueMBean.setFlowResumeCapacity(2 * MESSAGE_SIZE); + + assertFalse(_queueMBean.isFlowOverfull()); + assertFalse(channel.getBlocking()); + } private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException { |
