summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
commit9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch)
treeae86cedd9fdcea4f49993e5a82954ccda53a1ed3 /qpid/java/systests/src
parent1427de0275b5db2c8619db9211435897123259d8 (diff)
downloadqpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java347
1 files changed, 347 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
new file mode 100644
index 0000000000..283fb4ed4c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
@@ -0,0 +1,347 @@
+package org.apache.qpid.server.store;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StoreOverfullTest extends QpidBrokerTestCase
+{
+ private static final int TIMEOUT = 10000;
+ public static final int TEST_SIZE = 150;
+
+ private Connection _producerConnection;
+ private Connection _consumerConnection;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageProducer _producer;
+ private MessageConsumer _consumer;
+ private Queue _queue;
+
+ //private final AtomicInteger sentMessages = new AtomicInteger(0);
+
+ private static final int OVERFULL_SIZE = 4000000;
+ private static final int UNDERFULL_SIZE = 3500000;
+
+ public void setUp() throws Exception
+ {
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size",
+ String.valueOf(OVERFULL_SIZE));
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size",
+ String.valueOf(UNDERFULL_SIZE));
+ setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000");
+ super.setUp();
+
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producerConnection.start();
+
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ _producerConnection.close();
+ _consumerConnection.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ /*
+ * Test:
+ *
+ * Send > threshold amount of data : Sender is blocked
+ * Remove 90% of data : Sender is unblocked
+ *
+ */
+ public void testCapacityExceededCausesBlock() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ _queue = getTestQueue();
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+
+ while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ int mostMessages = (int) (0.9 * sentCount);
+ for(int i = 0; i < mostMessages; i++)
+ {
+ if(_consumer.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ long targetTime = System.currentTimeMillis() + 5000l;
+ while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount);
+
+ for(int i = mostMessages; i < TEST_SIZE; i++)
+ {
+ if(_consumer.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE);
+
+ }
+
+ /* Two producers on different queues
+ */
+
+ public void testCapacityExceededCausesBlockTwoConnections() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+ _queue = getTestQueue();
+ AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2");
+
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ Connection secondProducerConnection = getConnection();
+ Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondProducerSession.createProducer(queue2);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+ sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
+
+ while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+
+ while(!((AMQSession)secondProducerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount2 = sentMessages2.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
+
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(queue2);
+ _consumerConnection.start();
+
+
+ for(int i = 0; i < 2*TEST_SIZE; i++)
+ {
+ if(_consumer.receive(1000l) == null
+ && consumer2.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
+ assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
+ }
+
+ /*
+ * New producers are blocked
+ */
+
+ public void testCapacityExceededCausesBlockNewConnection() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+ _queue = getTestQueue();
+
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ Connection secondProducerConnection = getConnection();
+ Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondProducerSession.createProducer(_queue);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+
+ while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+ sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
+
+ while(!((AMQSession)secondProducerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount2 = sentMessages2.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
+
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+
+ for(int i = 0; i < 2*TEST_SIZE; i++)
+ {
+ if(_consumer.receive(2000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
+ assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
+
+ }
+
+
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod,
+ AtomicInteger sentMessages)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _senderProducer;
+ private final Session _senderSession;
+ private final int _numMessages;
+ private volatile JMSException _exception;
+ private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+ private long _sleepPeriod;
+ private final AtomicInteger _sentMessages;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+ {
+ _senderProducer = producer;
+ _senderSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ _sentMessages = sentMessages;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ _exceptionThrownLatch.countDown();
+ }
+ }
+
+ public Exception awaitSenderException(long timeout) throws InterruptedException
+ {
+ _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return _exception;
+ }
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ sentMessages.incrementAndGet();
+
+
+ try
+ {
+ ((AMQSession<?,?>)producerSession).sync();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final byte[] BYTE_32K = new byte[32*1024];
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_32K);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+}