summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java61
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java129
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java4
-rw-r--r--java/test-profiles/08InVMExcludes3
4 files changed, 182 insertions, 15 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 3bb8d397be..d6bdacee86 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -318,6 +318,67 @@ public class AMQQueueMBeanTest extends TestCase
}
}
+
+ public void testFlowControlProperties() throws Exception
+ {
+ assertTrue(_queueMBean.getCapacity() == 0);
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+ assertFalse(_queueMBean.isFlowOverfull());
+
+ //capacity currently 0, try setting FlowResumeCapacity above this
+ try
+ {
+ _queueMBean.setFlowResumeCapacity(1L);
+ fail("Should have failed to allow setting FlowResumeCapacity above Capacity");
+ }
+ catch (IllegalArgumentException ex)
+ {
+ //expected exception
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+ }
+
+ //(FlowResume)Capacity currently 0, set both to 2 then try setting Capacity below this
+ _queueMBean.setCapacity(2L);
+ assertTrue(_queueMBean.getCapacity() == 2L);
+ _queueMBean.setFlowResumeCapacity(2L);
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 2L);
+
+ try
+ {
+ _queueMBean.setCapacity(1L);
+ fail("Should have failed to allow setting Capacity below FlowResumeCapacity");
+ }
+ catch (IllegalArgumentException ex)
+ {
+ //expected exception
+ assertTrue(_queueMBean.getCapacity() == 2);
+ }
+
+ //set (FlowResume)Capacity to MESSAGE_SIZE +1 then add a message to the queue
+ _queueMBean.setCapacity(MESSAGE_SIZE + 1);
+ _queueMBean.setFlowResumeCapacity(MESSAGE_SIZE + 1);
+
+ AMQChannel channel = new AMQChannel(_protocolSession, 1, _messageStore);
+ sendMessages(1, true);
+ _queue.checkCapacity(channel);
+
+ assertFalse(_queueMBean.isFlowOverfull());
+ assertFalse(channel.getBlocking());
+
+ //add another message then check queue is now overfull and channel blocked
+ sendMessages(1, true);
+ _queue.checkCapacity(channel);
+
+ assertTrue(_queueMBean.isFlowOverfull());
+ assertTrue(channel.getBlocking());
+
+ //set FlowResumeCapacity to 2x MESSAGE_SIZE and check queue is now underfull and channel unblocked
+ _queueMBean.setCapacity(2 * MESSAGE_SIZE);//must increase capacity too
+ _queueMBean.setFlowResumeCapacity(2 * MESSAGE_SIZE);
+
+ assertFalse(_queueMBean.isFlowOverfull());
+ assertFalse(channel.getBlocking());
+ }
private IncomingMessage message(final boolean immediate, boolean persistent) throws AMQException
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index c4c38e0ece..e6be7c8263 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -25,7 +25,9 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.AMQException;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -43,8 +45,6 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
- protected final String QUEUE = "ProducerFlowControl";
-
private static final int MSG_COUNT = 50;
private Connection producerConnection;
@@ -54,12 +54,18 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private Connection consumerConnection;
private Session consumerSession;
-
private MessageConsumer consumer;
private final AtomicInteger _sentMessages = new AtomicInteger();
+ private JMXTestUtils _jmxUtils;
+ private boolean _jmxUtilConnected;
+ private static final String USER = "admin";
+
public void setUp() throws Exception
{
+ _jmxUtils = new JMXTestUtils(this, USER , USER);
+ _jmxUtils.setUp();
+ _jmxUtilConnected=false;
super.setUp();
_monitor.reset();
@@ -76,6 +82,17 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void tearDown() throws Exception
{
+ if(_jmxUtilConnected)
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
producerConnection.close();
consumerConnection.close();
super.tearDown();
@@ -84,11 +101,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testCapacityExceededCausesBlock()
throws JMSException, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -124,11 +143,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testBrokerLogMessages()
throws JMSException, NamingException, AMQException, InterruptedException, IOException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -161,6 +182,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testClientLogMessages()
throws JMSException, NamingException, AMQException, InterruptedException, IOException
{
+ String queueName = getTestQueueName();
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
@@ -170,8 +193,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -195,11 +218,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testFlowControlOnCapacityResumeEqual()
throws JMSException, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",1000);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -229,6 +254,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testFlowControlSoak()
throws Exception, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
_sentMessages.set(0);
final int numProducers = 10;
final int numMessages = 100;
@@ -237,9 +264,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
arguments.put("x-qpid-capacity",6000);
arguments.put("x-qpid-flow-resume-capacity",3000);
- ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
+ ((AMQSession) consumerSession).createQueue(new AMQShortString(queueName), false, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
@@ -285,6 +312,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
public void testSendTimeout()
throws JMSException, NamingException, AMQException, InterruptedException
{
+ String queueName = getTestQueueName();
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -292,8 +321,8 @@ public class ProducerFlowControlTest extends AbstractTestLogging
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -310,6 +339,76 @@ public class ProducerFlowControlTest extends AbstractTestLogging
assertNotNull("No timeout exception on sending", e);
}
+
+
+ public void testFlowControlAttributeModificationViaJMX()
+ throws JMSException, NamingException, AMQException, InterruptedException, Exception
+ {
+ _jmxUtils.open();
+ _jmxUtilConnected = true;
+
+ String queueName = getTestQueueName();
+
+ //create queue
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",0);
+ arguments.put("x-qpid-flow-resume-capacity",0);
+ ((AMQSession) producerSession).createQueue(new AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ Thread.sleep(1000);
+
+ //Create a JMX MBean proxy for the queue
+ ManagedQueue queueMBean = _jmxUtils.getManagedObject(ManagedQueue.class, _jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull(queueMBean);
+
+ //check current attribute values are 0 as expected
+ assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 0L);
+ assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 0L);
+
+ //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
+ queueMBean.setCapacity(250L);
+ queueMBean.setFlowResumeCapacity(250L);
+ assertTrue("Capacity was not the expected value", queueMBean.getCapacity() == 250L);
+ assertTrue("FlowResumeCapacity was not the expected value", queueMBean.getFlowResumeCapacity() == 250L);
+ assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+
+ // try to send 2 messages (should block after 1)
+ _sentMessages.set(0);
+ sendMessagesAsync(producer, producerSession, 2, 50L);
+
+ Thread.sleep(2000);
+
+ //check only 1 message was sent, and queue is overfull
+ assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise the attribute values, causing the queue to become underfull and allow the second message to be sent.
+ queueMBean.setCapacity(300L);
+ queueMBean.setFlowResumeCapacity(300L);
+
+ Thread.sleep(2000);
+
+ //check second message was sent, and caused the queue to become overfull again
+ assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded
+ queueMBean.setCapacity(700L);
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //receive a message, check queue becomes underfull
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ consumer.receive();
+ assertFalse("Queue should not be overfull", queueMBean.isFlowOverfull());
+
+ consumer.receive();
+ }
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index 20ef2ebfba..ca59a0536b 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -119,6 +119,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size());
// We have verified we have only one value in objectNames so return it
@@ -142,6 +143,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of queues with name '" + allObject.querystring +
"' returned", 1, objectNames.size());
@@ -167,6 +169,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of exchange with name '" + exchange +
"' returned", 1, objectNames.size());
@@ -181,6 +184,7 @@ public class JMXTestUtils
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
ObjectName objectName = objectNames.iterator().next();
diff --git a/java/test-profiles/08InVMExcludes b/java/test-profiles/08InVMExcludes
index 8e783a3f00..065076d3b6 100644
--- a/java/test-profiles/08InVMExcludes
+++ b/java/test-profiles/08InVMExcludes
@@ -1,3 +1,6 @@
//======================================================================
//Exclude the following tests when running the InVM default test profile
//======================================================================
+
+// QPID-2097 exclude until InVM JMX test runs are reliable
+org.apache.qpid.server.queue.ProducerFlowControlTest#testFlowControlAttributeModificationViaJMX