diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
| commit | 9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch) | |
| tree | ae86cedd9fdcea4f49993e5a82954ccda53a1ed3 /qpid/java/systests/src | |
| parent | 1427de0275b5db2c8619db9211435897123259d8 (diff) | |
| download | qpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz | |
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java new file mode 100644 index 0000000000..283fb4ed4c --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -0,0 +1,347 @@ +package org.apache.qpid.server.store; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class StoreOverfullTest extends QpidBrokerTestCase +{ + private static final int TIMEOUT = 10000; + public static final int TEST_SIZE = 150; + + private Connection _producerConnection; + private Connection _consumerConnection; + private Session _producerSession; + private Session _consumerSession; + private MessageProducer _producer; + private MessageConsumer _consumer; + private Queue _queue; + + //private final AtomicInteger sentMessages = new AtomicInteger(0); + + private static final int OVERFULL_SIZE = 4000000; + private static final int UNDERFULL_SIZE = 3500000; + + public void setUp() throws Exception + { + setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", + String.valueOf(OVERFULL_SIZE)); + setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", + String.valueOf(UNDERFULL_SIZE)); + setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000"); + super.setUp(); + + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producerConnection.start(); + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + try + { + _producerConnection.close(); + _consumerConnection.close(); + } + finally + { + super.tearDown(); + } + } + + /* + * Test: + * + * Send > threshold amount of data : Sender is blocked + * Remove 90% of data : Sender is unblocked + * + */ + public void testCapacityExceededCausesBlock() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + _queue = getTestQueue(); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + int mostMessages = (int) (0.9 * sentCount); + for(int i = 0; i < mostMessages; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + long targetTime = System.currentTimeMillis() + 5000l; + while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) + { + Thread.sleep(100l); + } + + assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount); + + for(int i = mostMessages; i < TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); + + } + + /* Two producers on different queues + */ + + public void testCapacityExceededCausesBlockTwoConnections() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2"); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(queue2); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + + while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + + _consumer = _consumerSession.createConsumer(_queue); + MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); + _consumerConnection.start(); + + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null + && consumer2.receive(1000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + } + + /* + * New producers are blocked + */ + + public void testCapacityExceededCausesBlockNewConnection() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(2000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + + } + + + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod, + AtomicInteger sentMessages) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages); + new Thread(sender).start(); + return sender; + } + + private class MessageSender implements Runnable + { + private final MessageProducer _senderProducer; + private final Session _senderSession; + private final int _numMessages; + private volatile JMSException _exception; + private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); + private long _sleepPeriod; + private final AtomicInteger _sentMessages; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + { + _senderProducer = producer; + _senderSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + _sentMessages = sentMessages; + } + + public void run() + { + try + { + sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages); + } + catch (JMSException e) + { + _exception = e; + _exceptionThrownLatch.countDown(); + } + } + + public Exception awaitSenderException(long timeout) throws InterruptedException + { + _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); + return _exception; + } + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + sentMessages.incrementAndGet(); + + + try + { + ((AMQSession<?,?>)producerSession).sync(); + } + catch (AMQException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + private static final byte[] BYTE_32K = new byte[32*1024]; + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_32K); + send.setIntProperty("msg", msg); + + return send; + } +} |
