From 727cc60c075632329c345086d21cd12355fa0234 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 7 Oct 2009 22:28:45 +0000 Subject: QPID-942 : Added tests for broker and client log messages produced when flow control invoked git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@822949 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/queue/ProducerFlowControlTest.java | 85 +++++++++++++++++++++- 1 file changed, 81 insertions(+), 4 deletions(-) (limited to 'java/systests/src') diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index 97147904e1..02db144694 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -26,15 +26,18 @@ import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.AbstractTestLogging; import org.apache.qpid.framing.AMQShortString; import javax.jms.*; import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; -public class ProducerFlowControlTest extends QpidTestCase +public class ProducerFlowControlTest extends AbstractTestLogging { private static final int TIMEOUT = 1500; @@ -56,10 +59,12 @@ public class ProducerFlowControlTest extends QpidTestCase private MessageConsumer consumer; private final AtomicInteger _sentMessages = new AtomicInteger(); - protected void setUp() throws Exception + public void setUp() throws Exception { super.setUp(); + _monitor.reset(); + producerConnection = getConnection(); producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -70,7 +75,7 @@ public class ProducerFlowControlTest extends QpidTestCase } - protected void tearDown() throws Exception + public void tearDown() throws Exception { producerConnection.close(); consumerConnection.close(); @@ -117,6 +122,79 @@ public class ProducerFlowControlTest extends QpidTestCase } + public void testBrokerLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + final Map arguments = new HashMap(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + List results = _monitor.findMatches("QUE-1003"); + + assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + while(consumer.receive(1000) != null); + + results = _monitor.findMatches("QUE-1004"); + + assertEquals("Did not find correct number of QUE_1004 queue underfull messages", 1, results.size()); + + + + } + + + public void testClientLogMessages() + throws JMSException, NamingException, AMQException, InterruptedException, IOException + { + long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + System.setProperty("qpid.flow_control_wait_failure","3000"); + System.setProperty("qpid.flow_control_wait_notify_period","1000"); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map arguments = new HashMap(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + List results = _monitor.findMatches("Message send delayed by"); + assertEquals("Incorrect number of delay messages logged by client",3,results.size()); + results = _monitor.findMatches("Message send failed due to timeout waiting on broker enforced flow control"); + assertEquals("Incorrect number of send failure messages logged by client",1,results.size()); + + System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue)); + System.setProperty("qpid.flow_control_wait_notify_period","5000"); + + + } + public void testFlowControlOnCapacityResumeEqual() throws JMSException, NamingException, AMQException, InterruptedException @@ -131,7 +209,6 @@ public class ProducerFlowControlTest extends QpidTestCase _sentMessages.set(0); - // try to send 5 messages (should block after 4) sendMessagesAsync(producer, producerSession, 5, 50L); -- cgit v1.2.1