diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:37:11 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:37:11 +0000 |
| commit | 5a888989bd28402e3271b05bcc32a7410f8d17a2 (patch) | |
| tree | 469fe7c0a7a484cf2663a70d289e342497303f0b /qpid/java/systests | |
| parent | 739f8964cdb5baf786759fb900928206a404d3fa (diff) | |
| download | qpid-python-5a888989bd28402e3271b05bcc32a7410f8d17a2.tar.gz | |
QPID-3986: Improved tests and resolved some potential thread-safety issues
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java | 174 | ||||
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java | 105 |
2 files changed, 221 insertions, 58 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java new file mode 100644 index 0000000000..2c029b4bf3 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageContentSource; + +public class QuotaMessageStore extends NullMessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private long _totalStoreSize;; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + private final StateManager _stateManager; + private final EventManager _eventManager = new EventManager(); + + public QuotaMessageStore() + { + _stateManager = new StateManager(_eventManager); + } + + @Override + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) + throws Exception + { + _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); + _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, + _persistentSizeHighThreshold); + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + _stateManager.attainState(State.INITIALISING); + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + { + _stateManager.attainState(State.INITIALISED); + } + + @Override + public void activate() throws Exception + { + _stateManager.attainState(State.ACTIVATING); + _stateManager.attainState(State.ACTIVE); + } + + @SuppressWarnings("unchecked") + @Override + public StoredMessage<StorableMessageMetaData> addMessage(StorableMessageMetaData metaData) + { + final long id = _messageId.getAndIncrement(); + return new StoredMemoryMessage(id, metaData); + } + + @Override + public Transaction newTransaction() + { + return new Transaction() + { + private AtomicLong _storeSizeIncrease = new AtomicLong(); + + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + _storeSizeIncrease.addAndGet(((MessageContentSource)message).getSize()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + _storeSizeIncrease.addAndGet(-((MessageContentSource)message).getSize()); + } + + @Override + public void commitTran() throws AMQStoreException + { + QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); + } + + @Override + public void abortTran() throws AMQStoreException + { + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } + }; + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public void close() throws Exception + { + _stateManager.attainState(State.CLOSING); + _closed.getAndSet(true); + _stateManager.attainState(State.CLOSED); + } + + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + + private void storedSizeChange(final int delta) + { + if(_persistentSizeHighThreshold > 0) + { + synchronized (this) + { + long newSize = _totalStoreSize += delta; + if(!_limitBusted && newSize > _persistentSizeHighThreshold) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + else if(_limitBusted && newSize < _persistentSizeHighThreshold) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + } + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java index bcee4e4930..cfe530750b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -1,8 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.store; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; @@ -11,6 +31,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -18,26 +39,13 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.test.utils.QpidBrokerTestCase; -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ public class StoreOverfullTest extends QpidBrokerTestCase { - private static final int TIMEOUT = 10000; - public static final int TEST_SIZE = 150; + /** Number of messages to send*/ + public static final int TEST_SIZE = 15; + + /** Message payload*/ + private static final byte[] BYTE_32K = new byte[32*1024]; private Connection _producerConnection; private Connection _consumerConnection; @@ -47,23 +55,14 @@ public class StoreOverfullTest extends QpidBrokerTestCase private MessageConsumer _consumer; private Queue _queue; - //private final AtomicInteger sentMessages = new AtomicInteger(0); - - private static final int OVERFULL_SIZE = 4000000; - private static final int UNDERFULL_SIZE = 3500000; + private static final int OVERFULL_SIZE = 400000; + private static final int UNDERFULL_SIZE = 350000; public void setUp() throws Exception { - setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", - String.valueOf(OVERFULL_SIZE)); - setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", - String.valueOf(UNDERFULL_SIZE)); - - if(getTestProfileMessageStoreClassName().contains("BDB")) - { - setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).name", "je.log.fileMax"); - setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).value", "1000000"); - } + setConfigurationProperty("virtualhosts.virtualhost.test.store.class", QuotaMessageStore.class.getName()); + setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", String.valueOf(OVERFULL_SIZE)); + setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", String.valueOf(UNDERFULL_SIZE)); super.setUp(); @@ -90,7 +89,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase } } - /* + /** * Test: * * Send > threshold amount of data : Sender is blocked @@ -105,9 +104,9 @@ public class StoreOverfullTest extends QpidBrokerTestCase _producer = _producerSession.createProducer(_queue); - sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + MessageSender sender = sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } @@ -125,7 +124,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase break; } } - + long targetTime = System.currentTimeMillis() + 5000l; while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) { @@ -143,12 +142,12 @@ public class StoreOverfullTest extends QpidBrokerTestCase } assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); - + assertNull("Unexpected exception on message sending:" + sender.getException(), sender.getException()); } - /* Two producers on different queues + /** + * Two producers on different queues */ - public void testCapacityExceededCausesBlockTwoConnections() throws Exception { AtomicInteger sentMessages = new AtomicInteger(0); @@ -168,7 +167,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); - while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } @@ -176,19 +175,17 @@ public class StoreOverfullTest extends QpidBrokerTestCase assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); - while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)secondProducerSession).isFlowBlocked()) { Thread.sleep(100l); } int sentCount2 = sentMessages2.get(); assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); - _consumer = _consumerSession.createConsumer(_queue); MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); _consumerConnection.start(); - for(int i = 0; i < 2*TEST_SIZE; i++) { if(_consumer.receive(1000l) == null @@ -202,10 +199,9 @@ public class StoreOverfullTest extends QpidBrokerTestCase assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); } - /* + /** * New producers are blocked */ - public void testCapacityExceededCausesBlockNewConnection() throws Exception { AtomicInteger sentMessages = new AtomicInteger(0); @@ -223,7 +219,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } @@ -232,18 +228,16 @@ public class StoreOverfullTest extends QpidBrokerTestCase sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); - while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } int sentCount2 = sentMessages2.get(); assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); - _consumer = _consumerSession.createConsumer(_queue); _consumerConnection.start(); - for(int i = 0; i < 2*TEST_SIZE; i++) { if(_consumer.receive(2000l) == null) @@ -257,8 +251,6 @@ public class StoreOverfullTest extends QpidBrokerTestCase } - - private MessageSender sendMessagesAsync(final MessageProducer producer, final Session producerSession, final int numMessages, @@ -302,11 +294,11 @@ public class StoreOverfullTest extends QpidBrokerTestCase } } - public Exception awaitSenderException(long timeout) throws InterruptedException + public Exception getException() { - _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); return _exception; } + } private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) @@ -318,7 +310,6 @@ public class StoreOverfullTest extends QpidBrokerTestCase producer.send(nextMessage(msg, producerSession)); sentMessages.incrementAndGet(); - try { ((AMQSession<?,?>)producerSession).sync(); @@ -340,14 +331,12 @@ public class StoreOverfullTest extends QpidBrokerTestCase } } - private static final byte[] BYTE_32K = new byte[32*1024]; - private Message nextMessage(int msg, Session producerSession) throws JMSException { BytesMessage send = producerSession.createBytesMessage(); send.writeBytes(BYTE_32K); send.setIntProperty("msg", msg); - return send; } + } |
