diff options
Diffstat (limited to 'java')
4 files changed, 182 insertions, 15 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 3bb8d397be..d6bdacee86 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -318,6 +318,67 @@ public class AMQQueueMBeanTest extends TestCase } } + + public void testFlowControlProperties() throws Exception + { + assertTrue(_queueMBean.getCapacity() == 0); + assertTrue(_queueMBean.getFlowResumeCapacity() == 0); + assertFalse(_queueMBean.isFlowOverfull()); + + //capacity currently 0, try setting FlowResumeCapacity above this + try + { + _queueMBean.setFlowResumeCapacity(1L); + fail("Should have failed to allow setting FlowResumeCapacity above Capacity"); + } + catch (IllegalArgumentException ex) + { + //expected exception + assertTrue(_queueMBean.getFlowResumeCapacity() == 0); + } + + //(FlowResume)Capacity currently 0, set both to 2 then try setting Capacity below this + _queueMBean.setCapacity(2L); + assertTrue(_queueMBean.getCapacity() == 2L); + _queueMBean.setFlowResumeCapacity(2L); + assertTrue(_queueMBean.getFlowResumeCapacity() == 2L); + + try + { + _queueMBean.setCapacity(1L); + fail("Should have failed to allow setting Capacity below FlowResumeCapacity"); + } + catch (IllegalArgumentException ex) + { + //expected exception + assertTrue(_queueMBean.getCapacity() == 2); + } + + //set (FlowResume)Capacity to MESSAGE_SIZE +1 then add a message to the queue + _queueMBean.setCapacity(MESSAGE_SIZE + 1); + _queueMBean.setFlowResumeCapacity(MESSAGE_SIZE + 1); + + AMQChannel channel = new AMQChannel(_protocolSession, 1, _messageStore); + sendMessages(1, true); + _queue.checkCapacity(channel); + + assertFalse(_queueMBean.isFlowOverfull()); + assertFalse(channel.getBlocking()); + + //add another message then check queue is now overfull and channel blocked + sendMessages(1, true); + _queue.checkCapacity(channel); + + assertTrue(_queueMBean.isFlowOverfull()); + assertTrue(channel.getBlocking()); + + //set FlowResumeCapacity to 2x MESSAGE_SIZE and check queue is now underfull and channel unblocked + _queueMBean.setCapacity(2 * MESSAGE_SIZE);//must increase capacity too + _queueMBean.setFlowResumeCapacity(2 * MESSAGE_SIZE); + + assertFalse(_queueMBean.isFlowOverfull()); + assertFalse(channel.getBlocking()); + } private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index c4c38e0ece..e6be7c8263 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -25,7 +25,9 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.AMQException; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; @@ -43,8 +45,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); - protected final String QUEUE = "ProducerFlowControl"; - private static final int MSG_COUNT = 50; private Connection producerConnection; @@ -54,12 +54,18 @@ public class ProducerFlowControlTest extends AbstractTestLogging private Connection consumerConnection; private Session consumerSession; - private MessageConsumer consumer; private final AtomicInteger _sentMessages = new AtomicInteger(); + private JMXTestUtils _jmxUtils; + private boolean _jmxUtilConnected; + private static final String USER = "admin"; + public void setUp() throws Exception { + _jmxUtils = new JMXTestUtils(this, USER , USER); + _jmxUtils.setUp(); + _jmxUtilConnected=false; super.setUp(); _monitor.reset(); @@ -76,6 +82,17 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void tearDown() throws Exception { + if(_jmxUtilConnected) + { + try + { + _jmxUtils.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } producerConnection.close(); consumerConnection.close(); super.tearDown(); @@ -84,11 +101,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testCapacityExceededCausesBlock() throws JMSException, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -124,11 +143,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testBrokerLogMessages() throws JMSException, NamingException, AMQException, InterruptedException, IOException { + String queueName = getTestQueueName(); + final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -161,6 +182,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testClientLogMessages() throws JMSException, NamingException, AMQException, InterruptedException, IOException { + String queueName = getTestQueueName(); + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); @@ -170,8 +193,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -195,11 +218,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlOnCapacityResumeEqual() throws JMSException, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",1000); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -229,6 +254,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlSoak() throws Exception, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + _sentMessages.set(0); final int numProducers = 10; final int numMessages = 100; @@ -237,9 +264,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",6000); arguments.put("x-qpid-flow-resume-capacity",3000); - ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments); + ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); consumerConnection.start(); @@ -285,6 +312,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testSendTimeout() throws JMSException, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -292,8 +321,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -310,6 +339,76 @@ public class ProducerFlowControlTest extends AbstractTestLogging assertNotNull("No timeout exception on sending", e); } + + + public void testFlowControlAttributeModificationViaJMX() + throws JMSException, NamingException, AMQException, InterruptedException, Exception + { + _jmxUtils.open(); + _jmxUtilConnected = true; + + String queueName = getTestQueueName(); + + //create queue + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",0); + arguments.put("x-qpid-flow-resume-capacity",0); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + Thread.sleep(1000); + + //Create a JMX MBean proxy for the queue + ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull(queueMBean); + + //check current attribute values are 0 as expected + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L); + + //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent + queueMBean.setCapacity(250L); + queueMBean.setFlowResumeCapacity(250L); + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L); + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + // try to send 2 messages (should block after 1) + _sentMessages.set(0); + sendMessagesAsync(producer, producerSession, 2, 50L); + + Thread.sleep(2000); + + //check only 1 message was sent, and queue is overfull + assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. + queueMBean.setCapacity(300L); + queueMBean.setFlowResumeCapacity(300L); + + Thread.sleep(2000); + + //check second message was sent, and caused the queue to become overfull again + assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded + queueMBean.setCapacity(700L); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //receive a message, check queue becomes underfull + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + consumer.receive(); + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + consumer.receive(); + } private MessageSender sendMessagesAsync(final MessageProducer producer, final Session producerSession, diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index 20ef2ebfba..ca59a0536b 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -119,6 +119,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size()); // We have verified we have only one value in objectNames so return it @@ -142,6 +143,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number of queues with name '" + allObject.querystring + "' returned", 1, objectNames.size()); @@ -167,6 +169,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number of exchange with name '" + exchange + "' returned", 1, objectNames.size()); @@ -181,6 +184,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size()); ObjectName objectName = objectNames.iterator().next(); diff --git a/java/test-profiles/08InVMExcludes b/java/test-profiles/08InVMExcludes index 8e783a3f00..065076d3b6 100644 --- a/java/test-profiles/08InVMExcludes +++ b/java/test-profiles/08InVMExcludes @@ -1,3 +1,6 @@ //====================================================================== //Exclude the following tests when running the InVM default test profile //====================================================================== + +// QPID-2097 exclude until InVM JMX test runs are reliable +org.apache.qpid.server.queue.ProducerFlowControlTest#testFlowControlAttributeModificationViaJMX |
