diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:37:11 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:37:11 +0000 |
| commit | 5a888989bd28402e3271b05bcc32a7410f8d17a2 (patch) | |
| tree | 469fe7c0a7a484cf2663a70d289e342497303f0b /qpid/java/broker/src | |
| parent | 739f8964cdb5baf786759fb900928206a404d3fa (diff) | |
| download | qpid-python-5a888989bd28402e3271b05bcc32a7410f8d17a2.tar.gz | |
QPID-3986: Improved tests and resolved some potential thread-safety issues
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
3 files changed, 250 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index e12c6fa271..849aa05099 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -591,7 +591,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public List<AMQChannel> getChannels() { - return new ArrayList<AMQChannel>(_channelMap.values()); + synchronized (_channelMap) + { + return new ArrayList<AMQChannel>(_channelMap.values()); + } } public AMQChannel getAndAssertChannel(int channelId) throws AMQException @@ -651,6 +654,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr synchronized (_channelMap) { _channelMap.put(channel.getChannelId(), channel); + + if(_blocking) + { + channel.block(); + } } } @@ -659,11 +667,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _cachedChannels[channelId] = channel; } - if(_blocking) - { - channel.block(); - } - checkForNotification(); } @@ -790,7 +793,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ private void closeAllChannels() throws AMQException { - for (AMQChannel channel : _channelMap.values()) + for (AMQChannel channel : getChannels()) { channel.close(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java new file mode 100644 index 0000000000..f1976ecee3 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource +{ + private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class); + + protected static final byte[] MESSAGE_DATA = new byte[32 * 1024]; + + private MessageStore _store; + private File _storeLocation; + + private List<Event> _events; + private UUID _transactionResource; + + protected abstract MessageStore createStore() throws Exception; + + protected abstract void applyStoreSpecificConfiguration(XMLConfiguration config); + + protected abstract int getNumberOfMessagesToFillStore(); + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _storeLocation = new File(new File(TMP_FOLDER), getTestName()); + FileUtils.delete(_storeLocation, true); + + XMLConfiguration config = new XMLConfiguration(); + config.addProperty("environment-path", _storeLocation.getAbsolutePath()); + applyStoreSpecificConfiguration(config); + + _store = createStore(); + _store.configureConfigStore("test", null, config); + + _transactionResource = UUID.randomUUID(); + _events = new ArrayList<Event>(); + _store.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + FileUtils.delete(_storeLocation, true); + } + + public void testOverflow() throws Exception + { + Transaction transaction = _store.newTransaction(); + + List<EnqueableMessage> messages = new ArrayList<EnqueableMessage>(); + for (int i = 0; i < getNumberOfMessagesToFillStore(); i++) + { + EnqueableMessage m = addMessage(i); + messages.add(m); + transaction.enqueueMessage(this, m); + } + transaction.commitTran(); + + assertEvent(1, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + + for (EnqueableMessage m : messages) + { + m.getStoredMessage().remove(); + } + + assertEvent(2, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + protected EnqueableMessage addMessage(long id) + { + MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString(getName()), false, false, + new AMQShortString(getName())); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType(getTestName()); + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, MESSAGE_DATA.length); + + MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1); + StoredMessage<MessageMetaData> handle = _store.addMessage(metaData); + handle.addContent(0, ByteBuffer.wrap(MESSAGE_DATA)); + TestMessage message = new TestMessage(id, handle); + return message; + } + + @Override + public void event(Event event) + { + _logger.debug("Test event listener received event " + event); + _events.add(event); + } + + private void assertEvent(int expectedNumberOfEvents, Event... expectedEvents) + { + assertEquals("Unexpected number of events received ", expectedNumberOfEvents, _events.size()); + for (Event event : expectedEvents) + { + assertTrue("Expected event is not found:" + event, _events.contains(event)); + } + } + + @Override + public UUID getId() + { + return _transactionResource; + } + + private static class TestMessage implements EnqueableMessage + { + private final StoredMessage<?> _handle; + private final long _messageId; + + public TestMessage(long messageId, StoredMessage<?> handle) + { + _messageId = messageId; + _handle = handle; + } + + public long getMessageNumber() + { + return _messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage<?> getStoredMessage() + { + return _handle; + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java new file mode 100644 index 0000000000..5d316fca43 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; + +public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase +{ + private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); + + private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; + + /** + * Estimated using an assumption that a physical disk space occupied by a + * message is 3 times bigger then a message size + */ + private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8); + + private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8); + + @Override + protected int getNumberOfMessagesToFillStore() + { + return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; + } + + @Override + protected void applyStoreSpecificConfiguration(XMLConfiguration config) + { + _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + config.addProperty("overfull-size", OVERFULL_SIZE); + config.addProperty("underfull-size", UNDERFULL_SIZE); + } + + @Override + protected MessageStore createStore() throws Exception + { + return new DerbyMessageStore(); + } +} |
