diff options
| author | Robert Gemmell <robbie@apache.org> | 2009-12-08 04:05:04 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2009-12-08 04:05:04 +0000 |
| commit | 10adf40dcbe9378662b365de0ac57c57296d026b (patch) | |
| tree | c9853783adce0af133eba2881879f317298045dd /java/systests/src | |
| parent | cb3edd774a322d0b62776ff25f519ecfb08bd77f (diff) | |
| download | qpid-python-10adf40dcbe9378662b365de0ac57c57296d026b.tar.gz | |
QPID-2177: unit and system testing for the new flow controlled related attributes of the Queue MBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@888250 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java | 129 | ||||
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java | 4 |
2 files changed, 118 insertions, 15 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index c4c38e0ece..e6be7c8263 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -25,7 +25,9 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.AMQException; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.logging.AbstractTestLogging; +import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; @@ -43,8 +45,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); - protected final String QUEUE = "ProducerFlowControl"; - private static final int MSG_COUNT = 50; private Connection producerConnection; @@ -54,12 +54,18 @@ public class ProducerFlowControlTest extends AbstractTestLogging private Connection consumerConnection; private Session consumerSession; - private MessageConsumer consumer; private final AtomicInteger _sentMessages = new AtomicInteger(); + private JMXTestUtils _jmxUtils; + private boolean _jmxUtilConnected; + private static final String USER = "admin"; + public void setUp() throws Exception { + _jmxUtils = new JMXTestUtils(this, USER , USER); + _jmxUtils.setUp(); + _jmxUtilConnected=false; super.setUp(); _monitor.reset(); @@ -76,6 +82,17 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void tearDown() throws Exception { + if(_jmxUtilConnected) + { + try + { + _jmxUtils.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } producerConnection.close(); consumerConnection.close(); super.tearDown(); @@ -84,11 +101,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testCapacityExceededCausesBlock() throws JMSException, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -124,11 +143,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testBrokerLogMessages() throws JMSException, NamingException, AMQException, InterruptedException, IOException { + String queueName = getTestQueueName(); + final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -161,6 +182,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testClientLogMessages() throws JMSException, NamingException, AMQException, InterruptedException, IOException { + String queueName = getTestQueueName(); + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000"); @@ -170,8 +193,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -195,11 +218,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlOnCapacityResumeEqual() throws JMSException, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",1000); - ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); @@ -229,6 +254,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlSoak() throws Exception, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + _sentMessages.set(0); final int numProducers = 10; final int numMessages = 100; @@ -237,9 +264,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging arguments.put("x-qpid-capacity",6000); arguments.put("x-qpid-flow-resume-capacity",3000); - ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments); + ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); consumerConnection.start(); @@ -285,6 +312,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testSendTimeout() throws JMSException, NamingException, AMQException, InterruptedException { + String queueName = getTestQueueName(); + setTestClientSystemProperty("qpid.flow_control_wait_failure","3000"); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -292,8 +321,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",1000); arguments.put("x-qpid-flow-resume-capacity",800); - ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); - queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); ((AMQSession) session).declareAndBind((AMQDestination)queue); producer = session.createProducer(queue); @@ -310,6 +339,76 @@ public class ProducerFlowControlTest extends AbstractTestLogging assertNotNull("No timeout exception on sending", e); } + + + public void testFlowControlAttributeModificationViaJMX() + throws JMSException, NamingException, AMQException, InterruptedException, Exception + { + _jmxUtils.open(); + _jmxUtilConnected = true; + + String queueName = getTestQueueName(); + + //create queue + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",0); + arguments.put("x-qpid-flow-resume-capacity",0); + ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments); + queue = new AMQQueue("amq.direct",queueName); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + Thread.sleep(1000); + + //Create a JMX MBean proxy for the queue + ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull(queueMBean); + + //check current attribute values are 0 as expected + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L); + + //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent + queueMBean.setCapacity(250L); + queueMBean.setFlowResumeCapacity(250L); + assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L); + assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L); + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + // try to send 2 messages (should block after 1) + _sentMessages.set(0); + sendMessagesAsync(producer, producerSession, 2, 50L); + + Thread.sleep(2000); + + //check only 1 message was sent, and queue is overfull + assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. + queueMBean.setCapacity(300L); + queueMBean.setFlowResumeCapacity(300L); + + Thread.sleep(2000); + + //check second message was sent, and caused the queue to become overfull again + assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get()); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded + queueMBean.setCapacity(700L); + assertTrue("Queue should be overfull", queueMBean.isFlowOverfull()); + + //receive a message, check queue becomes underfull + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + consumer.receive(); + assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull()); + + consumer.receive(); + } private MessageSender sendMessagesAsync(final MessageProducer producer, final Session producerSession, diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index 20ef2ebfba..ca59a0536b 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -119,6 +119,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size()); // We have verified we have only one value in objectNames so return it @@ -142,6 +143,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number of queues with name '" + allObject.querystring + "' returned", 1, objectNames.size()); @@ -167,6 +169,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("Incorrect number of exchange with name '" + exchange + "' returned", 1, objectNames.size()); @@ -181,6 +184,7 @@ public class JMXTestUtils Set<ObjectName> objectNames = allObject.returnObjects(); + _test.assertNotNull("Null ObjectName Set returned", objectNames); _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size()); ObjectName objectName = objectNames.iterator().next(); |
