From 15b0ef9cab78cbe91f434a8e984fee09fe96bd69 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 25 Mar 2009 18:39:25 +0000 Subject: QPID-1735 : Added Documentation to QueueBackingStore around thread safety of load/unload, Updated FQBS to adhere to the thread safety specified by the interface. QueueEntry was updated to return the AMQMessage from the load() to complete the getMessage() interface. As in a flowed state the message may be purged before a reference can be taken. Added new Test QueueEntryImplThreadingTest that should later be run for longer but aims to show that load always returns the message even when unloads are occuring asynchronously. Commit from 0.5-release : r758388 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@758397 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/ExtractResendAndRequeueTest.java | 7 +++++ .../exchange/AbstractHeadersExchangeTestBase.java | 6 ++-- .../qpid/server/filter/PropertyExpressionTest.java | 8 ++++++ .../apache/qpid/server/queue/MockAMQMessage.java | 16 +++++++++++ .../org/apache/qpid/server/queue/MockAMQQueue.java | 32 +++++++++++++++------- .../server/security/access/ACLManagerTest.java | 7 +++++ 6 files changed, 62 insertions(+), 14 deletions(-) (limited to 'java/broker/src/test') diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 3ab127b59d..c370fd9867 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import java.util.Map; @@ -96,6 +97,12 @@ public class ExtractResendAndRequeueTest extends TestCase assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); } + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockAMQQueue will create + ApplicationRegistry.getInstance().close(); + } + /** * Helper method to create a new subscription and aquire the given messages. * diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 22e49a0241..ee1796ba2f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -36,11 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.FailedDequeueException; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageCleanupException; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.queue.UnableToFlowMessageException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; @@ -356,9 +354,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void load() + public AMQMessage load() { - //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isFlowed() diff --git a/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java index 9344efd4a8..e7b3f40393 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java @@ -23,10 +23,18 @@ package org.apache.qpid.server.filter; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.MockQueueEntry; +import org.apache.qpid.server.registry.ApplicationRegistry; public class PropertyExpressionTest extends TestCase { + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockQueueEntry will create + ApplicationRegistry.remove(1); + } + + public void testJMSRedelivered() { PropertyExpression pe = new PropertyExpression("JMSRedelivered"); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index b38da53406..11049a7ae3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -23,6 +23,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; + +import java.util.LinkedList; +import java.util.ArrayList; public class MockAMQMessage extends TransientAMQMessage { @@ -31,6 +39,14 @@ public class MockAMQMessage extends TransientAMQMessage { super(messageId); _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null); + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setMessageId(String.valueOf(messageId)); + properties.setTimestamp(System.currentTimeMillis()); + properties.setDeliveryMode((byte)1); + + _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID); + _contentBodies = new ArrayList(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index d9e4cc9b70..ff814840bc 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.AMQException; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; import java.util.List; import java.util.Set; @@ -39,10 +41,20 @@ public class MockAMQQueue implements AMQQueue private boolean _deleted = false; private int _queueCount; private AMQShortString _name; + private VirtualHost _virtualhost; public MockAMQQueue(String name) { - _name = new AMQShortString(name); + _name = new AMQShortString(name); + _virtualhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + try + { + _virtualhost.getQueueRegistry().registerQueue(this); + } + catch (AMQException e) + { + e.printStackTrace(); + } } public AMQShortString getName() @@ -67,7 +79,7 @@ public class MockAMQQueue implements AMQQueue public VirtualHost getVirtualHost() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _virtualhost; } public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException @@ -152,7 +164,7 @@ public class MockAMQQueue implements AMQQueue public int delete() throws AMQException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; //To change body of implemented methods use File | Settings | File Templates. } public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException @@ -343,7 +355,7 @@ public class MockAMQQueue implements AMQQueue public void setMinimumAlertRepeatGap(long value) { - + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index 6c6835ccca..abcd9855d9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; public class ACLManagerTest extends TestCase { @@ -67,6 +68,12 @@ public class ACLManagerTest extends TestCase _session = new MockProtocolSession(new TestableMemoryMessageStore()); } + + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockAMQQueue will create + ApplicationRegistry.getInstance().close(); + } public void testACLManagerConfigurationPluginManager() throws Exception { -- cgit v1.2.1