summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-05-29 11:37:11 +0000
committerRobert Gemmell <robbie@apache.org>2012-05-29 11:37:11 +0000
commit5a888989bd28402e3271b05bcc32a7410f8d17a2 (patch)
tree469fe7c0a7a484cf2663a70d289e342497303f0b /qpid/java/broker/src
parent739f8964cdb5baf786759fb900928206a404d3fa (diff)
downloadqpid-python-5a888989bd28402e3271b05bcc32a7410f8d17a2.tar.gz
QPID-3986: Improved tests and resolved some potential thread-safety issues
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java17
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java178
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java62
3 files changed, 250 insertions, 7 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index e12c6fa271..849aa05099 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -591,7 +591,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
public List<AMQChannel> getChannels()
{
- return new ArrayList<AMQChannel>(_channelMap.values());
+ synchronized (_channelMap)
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
}
public AMQChannel getAndAssertChannel(int channelId) throws AMQException
@@ -651,6 +654,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
synchronized (_channelMap)
{
_channelMap.put(channel.getChannelId(), channel);
+
+ if(_blocking)
+ {
+ channel.block();
+ }
}
}
@@ -659,11 +667,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_cachedChannels[channelId] = channel;
}
- if(_blocking)
- {
- channel.block();
- }
-
checkForNotification();
}
@@ -790,7 +793,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
private void closeAllChannels() throws AMQException
{
- for (AMQChannel channel : _channelMap.values())
+ for (AMQChannel channel : getChannels())
{
channel.close();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
new file mode 100644
index 0000000000..f1976ecee3
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource
+{
+ private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class);
+
+ protected static final byte[] MESSAGE_DATA = new byte[32 * 1024];
+
+ private MessageStore _store;
+ private File _storeLocation;
+
+ private List<Event> _events;
+ private UUID _transactionResource;
+
+ protected abstract MessageStore createStore() throws Exception;
+
+ protected abstract void applyStoreSpecificConfiguration(XMLConfiguration config);
+
+ protected abstract int getNumberOfMessagesToFillStore();
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _storeLocation = new File(new File(TMP_FOLDER), getTestName());
+ FileUtils.delete(_storeLocation, true);
+
+ XMLConfiguration config = new XMLConfiguration();
+ config.addProperty("environment-path", _storeLocation.getAbsolutePath());
+ applyStoreSpecificConfiguration(config);
+
+ _store = createStore();
+ _store.configureConfigStore("test", null, config);
+
+ _transactionResource = UUID.randomUUID();
+ _events = new ArrayList<Event>();
+ _store.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ FileUtils.delete(_storeLocation, true);
+ }
+
+ public void testOverflow() throws Exception
+ {
+ Transaction transaction = _store.newTransaction();
+
+ List<EnqueableMessage> messages = new ArrayList<EnqueableMessage>();
+ for (int i = 0; i < getNumberOfMessagesToFillStore(); i++)
+ {
+ EnqueableMessage m = addMessage(i);
+ messages.add(m);
+ transaction.enqueueMessage(this, m);
+ }
+ transaction.commitTran();
+
+ assertEvent(1, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+
+ for (EnqueableMessage m : messages)
+ {
+ m.getStoredMessage().remove();
+ }
+
+ assertEvent(2, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+ protected EnqueableMessage addMessage(long id)
+ {
+ MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString(getName()), false, false,
+ new AMQShortString(getName()));
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType(getTestName());
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, MESSAGE_DATA.length);
+
+ MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
+ StoredMessage<MessageMetaData> handle = _store.addMessage(metaData);
+ handle.addContent(0, ByteBuffer.wrap(MESSAGE_DATA));
+ TestMessage message = new TestMessage(id, handle);
+ return message;
+ }
+
+ @Override
+ public void event(Event event)
+ {
+ _logger.debug("Test event listener received event " + event);
+ _events.add(event);
+ }
+
+ private void assertEvent(int expectedNumberOfEvents, Event... expectedEvents)
+ {
+ assertEquals("Unexpected number of events received ", expectedNumberOfEvents, _events.size());
+ for (Event event : expectedEvents)
+ {
+ assertTrue("Expected event is not found:" + event, _events.contains(event));
+ }
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _transactionResource;
+ }
+
+ private static class TestMessage implements EnqueableMessage
+ {
+ private final StoredMessage<?> _handle;
+ private final long _messageId;
+
+ public TestMessage(long messageId, StoredMessage<?> handle)
+ {
+ _messageId = messageId;
+ _handle = handle;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage<?> getStoredMessage()
+ {
+ return _handle;
+ }
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
new file mode 100644
index 0000000000..5d316fca43
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.derby;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase;
+
+public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase
+{
+ private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class);
+
+ private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10;
+
+ /**
+ * Estimated using an assumption that a physical disk space occupied by a
+ * message is 3 times bigger then a message size
+ */
+ private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8);
+
+ private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8);
+
+ @Override
+ protected int getNumberOfMessagesToFillStore()
+ {
+ return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE;
+ }
+
+ @Override
+ protected void applyStoreSpecificConfiguration(XMLConfiguration config)
+ {
+ _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE);
+
+ config.addProperty("overfull-size", OVERFULL_SIZE);
+ config.addProperty("underfull-size", UNDERFULL_SIZE);
+ }
+
+ @Override
+ protected MessageStore createStore() throws Exception
+ {
+ return new DerbyMessageStore();
+ }
+}