summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2009-12-08 04:05:04 +0000
committerRobert Gemmell <robbie@apache.org>2009-12-08 04:05:04 +0000
commit10adf40dcbe9378662b365de0ac57c57296d026b (patch)
treec9853783adce0af133eba2881879f317298045dd /java/systests/src
parentcb3edd774a322d0b62776ff25f519ecfb08bd77f (diff)
downloadqpid-python-10adf40dcbe9378662b365de0ac57c57296d026b.tar.gz
QPID-2177: unit and system testing for the new flow controlled related attributes of the Queue MBean
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@888250 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java129
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java4
2 files changed, 118 insertions, 15 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index c4c38e0ece..e6be7c8263 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -25,7 +25,9 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.AMQException;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -43,8 +45,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
- protected final String QUEUE = "ProducerFlowControl";
-
private static final int MSG_COUNT = 50;
private Connection producerConnection;
@@ -54,12 +54,18 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private Connection consumerConnection;
private Session consumerSession;
-
private MessageConsumer consumer;
private final AtomicInteger _sentMessages = new AtomicInteger();
+ private JMXTestUtils _jmxUtils;
+ private boolean _jmxUtilConnected;
+ private static final String USER = "admin";
+
public void setUp() throws Exception
{
+ _jmxUtils = new JMXTestUtils(this, USER , USER);
+ _jmxUtils.setUp();
+ _jmxUtilConnected=false;
super.setUp();
_monitor.reset();
@@ -76,6 +82,17 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void tearDown() throws Exception
{
+ if(_jmxUtilConnected)
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
producerConnection.close();
consumerConnection.close();
super.tearDown();
@@ -84,11 +101,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testCapacityExceededCausesBlock()
throws JMSException, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -124,11 +143,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testBrokerLogMessages()
throws JMSException, NamingException, AMQException, InterruptedException, IOException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -161,6 +182,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testClientLogMessages()
throws JMSException, NamingException, AMQException, InterruptedException, IOException
{
+ String queueName = getTestQueueName();
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
@@ -170,8 +193,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -195,11 +218,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testFlowControlOnCapacityResumeEqual()
throws JMSException, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",1000);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -229,6 +254,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testFlowControlSoak()
throws Exception, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
_sentMessages.set(0);
final int numProducers = 10;
final int numMessages = 100;
@@ -237,9 +264,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",6000);
arguments.put("x-qpid-flow-resume-capacity",3000);
- ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
+ ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
@@ -285,6 +312,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testSendTimeout()
throws JMSException, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -292,8 +321,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -310,6 +339,76 @@ public class ProducerFlowControlTest extends AbstractTestLogging
assertNotNull("No timeout exception on sending", e);
}
+
+
+ public void testFlowControlAttributeModificationViaJMX()
+ throws JMSException, NamingException, AMQException, InterruptedException, Exception
+ {
+ _jmxUtils.open();
+ _jmxUtilConnected = true;
+
+ String queueName = getTestQueueName();
+
+ //create queue
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",0);
+ arguments.put("x-qpid-flow-resume-capacity",0);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ Thread.sleep(1000);
+
+ //Create a JMX MBean proxy for the queue
+ ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull(queueMBean);
+
+ //check current attribute values are 0 as expected
+ assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L);
+ assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L);
+
+ //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
+ queueMBean.setCapacity(250L);
+ queueMBean.setFlowResumeCapacity(250L);
+ assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L);
+ assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L);
+ assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+
+ // try to send 2 messages (should block after 1)
+ _sentMessages.set(0);
+ sendMessagesAsync(producer, producerSession, 2, 50L);
+
+ Thread.sleep(2000);
+
+ //check only 1 message was sent, and queue is overfull
+ assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise the attribute values, causing the queue to become underfull and allow the second message to be sent.
+ queueMBean.setCapacity(300L);
+ queueMBean.setFlowResumeCapacity(300L);
+
+ Thread.sleep(2000);
+
+ //check second message was sent, and caused the queue to become overfull again
+ assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded
+ queueMBean.setCapacity(700L);
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //receive a message, check queue becomes underfull
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ consumer.receive();
+ assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+
+ consumer.receive();
+ }
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index 20ef2ebfba..ca59a0536b 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -119,6 +119,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size());
// We have verified we have only one value in objectNames so return it
@@ -142,6 +143,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of queues with name '" + allObject.querystring +
"' returned", 1, objectNames.size());
@@ -167,6 +169,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of exchange with name '" + exchange +
"' returned", 1, objectNames.size());
@@ -181,6 +184,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
ObjectName objectName = objectNames.iterator().next();