diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-03 17:54:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-03 17:54:44 +0000 |
| commit | 590feae56049782150cc54b46158b77b94c53cdf (patch) | |
| tree | e81e4f01cabe32894eecfcf8e24ed670e2aea520 /java/broker | |
| parent | 9c80608ac4e50745f82c4ff75f325ff87bc6ceb9 (diff) | |
| download | qpid-python-590feae56049782150cc54b46158b77b94c53cdf.tar.gz | |
QPID-1764 : Updated all tests to use the TestTransactionLog interface and split testing code into subclasses. TestableTransactionLog will now correctly wrap a TransactionLog for testing. To enable testing of the BaseTransactionLog a TestableBaseTransactionLog was needed to only return values that are actually stored in the BaseTL the TestableTransactionLog actually stores single enqueues so that they can be queried by the test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@761741 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
16 files changed, 397 insertions, 261 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index d57b81c362..f5819716cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -51,7 +51,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class MemoryMessageStore implements TransactionLog, RoutingTable { - private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); + protected static final Logger _log = Logger.getLogger(MemoryMessageStore.class); private static final int DEFAULT_HASHTABLE_CAPACITY = 50000; @@ -154,13 +154,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - for (AMQQueue q : queues) - { - if (q.isDurable()) - { - enqueueMessage(context,q,messageId); - } - } + // Not required to do anything } public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException @@ -232,25 +226,13 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException - { - checkNotClosed(); - return _metaDataMap.get(messageId); - } - - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException - { - checkNotClosed(); - List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - return bodyList.get(index); - } public boolean isPersistent() { return false; } - private void checkNotClosed() throws MessageStoreClosedException + protected void checkNotClosed() throws MessageStoreClosedException { if (_closed.get()) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 52b8b0ad19..e034143596 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -103,7 +103,7 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; - private TransactionLog _transactionLog = new TestableMemoryMessageStore(); + private TransactionLog _transactionLog = new TestableMemoryMessageStore().configure(); private static final int MESSAGE_SIZE=100; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 89dbc4f959..6ae2324e5f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactory; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -43,14 +44,13 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; /** * Test class to test AMQQueueMBean attribtues and operations @@ -70,7 +70,7 @@ public class AMQQueueMBeanTest extends TestCase public void testMessageCountTransient() throws Exception { int messageCount = 10; - sendMessages(messageCount, false); + List<AMQMessage> messages = sendMessages(messageCount, false); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE); @@ -85,13 +85,13 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); //Ensure that the data has been removed from the Store - verifyBrokerState(); + verifyBrokerState(messages); } public void testMessageCountPersistent() throws Exception { int messageCount = 10; - sendMessages(messageCount, true); + List<AMQMessage> messages = sendMessages(messageCount, true); assertEquals("", messageCount, _queueMBean.getMessageCount().intValue()); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE); @@ -106,20 +106,38 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); //Ensure that the data has been removed from the Store - verifyBrokerState(); + verifyBrokerState(messages); } // todo: collect to a general testing class -duplicated from Systest/MessageReturntest - private void verifyBrokerState() + private void verifyBrokerState(List<AMQMessage> messages) { - TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog()); + TestableTransactionLog store = new TestableTransactionLog(_virtualHost.getTransactionLog()); - // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. - assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); - assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); - assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); - assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); + // We can only now check messageData and ConentBodyChunks by MessageID. + for (AMQMessage message : messages) + { + // Check we have no message metadata for the messages we sent + try + { + assertNull(store.getMessageMetaData(new StoreContext(), message.getMessageId())); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + try + { + assertNull(store.getContentBodyChunk(new StoreContext(), message.getMessageId(),0)); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } } public void testConsumerCount() throws AMQException @@ -297,8 +315,9 @@ public class AMQQueueMBeanTest extends TestCase ApplicationRegistry.remove(1); } - private void sendMessages(int messageCount, boolean persistent) throws AMQException + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { + List<AMQMessage> messages = new LinkedList<AMQMessage>(); for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); @@ -316,9 +335,10 @@ public class AMQQueueMBeanTest extends TestCase .convertToContentChunk( new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), MESSAGE_SIZE))); - currentMessage.deliverToQueues(); + messages.add(currentMessage.deliverToQueues()); } + return messages; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java index 3280516b56..58073e52b6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; @@ -38,8 +39,6 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -47,6 +46,7 @@ import org.apache.qpid.server.util.NullApplicationRegistry; import java.util.ArrayList; import java.util.LinkedList; import java.util.Set; +import java.util.List; /** * Tests that acknowledgements are handled correctly. @@ -59,7 +59,7 @@ public class AckTest extends TestCase private MockProtocolSession _protocolSession; - private TestableMemoryMessageStore _messageStore; + private TestableTransactionLog _transactionLog; private StoreContext _storeContext = new StoreContext(); @@ -75,9 +75,9 @@ public class AckTest extends TestCase ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog()); - _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); + _transactionLog = new TestableTransactionLog(vhost.getTransactionLog()); + _protocolSession = new MockProtocolSession(_transactionLog); + _channel = new AMQChannel(_protocolSession,5, _transactionLog /*dont need exchange registry*/); _protocolSession.addChannel(_channel); @@ -95,13 +95,13 @@ public class AckTest extends TestCase publishMessages(count, false); } - private void publishMessages(int count, boolean persistent) throws AMQException + private List<AMQMessage> publishMessages(int count, boolean persistent) throws AMQException { - TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, + TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); _queue.registerSubscription(_subscription,false); - MessageFactory factory = MessageFactory.getInstance(); + List<AMQMessage> sentMessages = new LinkedList<AMQMessage>(); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -109,7 +109,7 @@ public class AckTest extends TestCase MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false, false, new AMQShortString("rk")); - IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore); + IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _transactionLog); //IncomingMessage msg2 = null; if (persistent) { @@ -130,14 +130,16 @@ public class AckTest extends TestCase ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); msg.enqueue(qs); - msg.routingComplete(_messageStore); + msg.routingComplete(_transactionLog); if(msg.allContentReceived()) { - msg.deliverToQueues(); + sentMessages.add(msg.deliverToQueues()); } // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); } + + return sentMessages; } /** @@ -148,11 +150,16 @@ public class AckTest extends TestCase { _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; - publishMessages(msgCount, true); + List<AMQMessage> sentMessages = publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); + for (AMQMessage message : sentMessages) + { + List<AMQQueue> enqueuedQueues = _transactionLog.getMessageReferenceMap(message.getMessageId()); + assertNotNull("Expected message to be enqueued",enqueuedQueues); + assertEquals("Message is not enqueued on expected number of queues.",1, enqueuedQueues.size()); + } Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -165,7 +172,6 @@ public class AckTest extends TestCase } assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); } /** @@ -180,8 +186,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize()); +// assertTrue(_messageStore.getContentBodyMap().size() == 0);to be } @@ -197,8 +203,8 @@ public class AckTest extends TestCase UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); - assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); + assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize()); +// assertTrue(_messageStore.getContentBodyMap().size() == 0); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java index d007913a4f..4b4c404229 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -31,10 +32,10 @@ import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.commons.configuration.PropertiesConfiguration; import java.util.ArrayList; import java.util.LinkedList; @@ -42,7 +43,7 @@ import java.util.List; public class PersistentMessageTest extends TransientMessageTest { - private TestableMemoryMessageStore _messageStore; + private TestableTransactionLog _transactionLog; protected SimpleAMQQueue _queue; protected AMQShortString _q1name = new AMQShortString("q1name"); @@ -54,22 +55,22 @@ public class PersistentMessageTest extends TransientMessageTest public void setUp() throws Exception { - _messageStore = new TestableMemoryMessageStore(); + _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure()); _storeContext = new StoreContext(); VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration(PersistentMessageTest.class.getName(), new PropertiesConfiguration()), - _messageStore); + _transactionLog); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null); // Create IncomingMessage and nondurable queue - _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages); + _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, _returnMessages); } @Override protected AMQMessage newMessage() { - return MessageFactory.getInstance().createMessage(_messageStore, true); + return MessageFactory.getInstance().createMessage(_transactionLog, true); } @Override @@ -82,7 +83,7 @@ public class PersistentMessageTest extends TransientMessageTest /** * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right - * contents. TransactionLog = Empty, returnMessages 1 item. + * contents. TransactionLog = Empty, returnMessages 1 item. * * @throws Exception */ @@ -98,17 +99,16 @@ public class PersistentMessageTest extends TransientMessageTest // equivalent to amqChannel.routeMessage() msg.enqueue(qs); - msg.routingComplete(_messageStore); + msg.routingComplete(_transactionLog); // equivalent to amqChannel.deliverCurrentMessageIfComplete msg.deliverToQueues(); // Check that data has been stored to disk long messageId = msg.getMessageId(); - checkMessageMetaDataExists(messageId); // Check that it was not enqueued - List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId); + List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId); assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty()); checkMessageMetaDataRemoved(messageId); @@ -118,7 +118,7 @@ public class PersistentMessageTest extends TransientMessageTest protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException { IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext, - new MockProtocolSession(_messageStore), _messageStore); + new MockProtocolSession(_transactionLog), _transactionLog); // equivalent to amqChannel.publishContenHeader ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); @@ -138,7 +138,8 @@ public class PersistentMessageTest extends TransientMessageTest { try { - _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId); + assertNotNull("Message MetaData does not exist for message:" + messageId, + _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); } catch (AMQException amqe) { @@ -151,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest try { assertNull("Message MetaData still exists for message:" + messageId, - _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); - List ids = _messageStore.getMessageReferenceMap(messageId); + _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId)); + List ids = _transactionLog.getMessageReferenceMap(messageId); assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index d5e873ebc0..4e7bad06ae 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -35,12 +35,13 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestTransactionLog; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import java.util.ArrayList; import java.util.List; @@ -50,7 +51,7 @@ public class SimpleAMQQueueTest extends TestCase protected SimpleAMQQueue _queue; protected VirtualHost _virtualHost; - protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore(); + protected TestableTransactionLog _transactionLog; protected AMQShortString _qname = new AMQShortString("qname"); protected AMQShortString _owner = new AMQShortString("owner"); protected AMQShortString _routingKey = new AMQShortString("routing key"); @@ -68,6 +69,7 @@ public class SimpleAMQQueueTest extends TestCase //Create Application Registry for test ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1); + _transactionLog = new TestableTransactionLog(new MemoryMessageStore().configure()); PropertiesConfiguration env = new PropertiesConfiguration(); _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); @@ -340,7 +342,9 @@ public class SimpleAMQQueueTest extends TestCase // Check that it is enqueued List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId); - assertNotNull(data); + assertNotNull("Message has no enqueued information.", data); + assertTrue("Message is not enqueued on correct queue.", data.contains(_queue)); + assertEquals("Message not enqueued on the right queues.", 1, data.size()); // Dequeue message ContentHeaderBody header = new ContentHeaderBody(); @@ -355,7 +359,7 @@ public class SimpleAMQQueueTest extends TestCase // Check that it is dequeued data = _transactionLog.getMessageReferenceMap(messageId); - assertTrue(data == null || data.isEmpty()); + assertNull("Message still has enqueue data.", data); } public void testMessagesFlowToDisk() throws AMQException, InterruptedException @@ -509,7 +513,9 @@ public class SimpleAMQQueueTest extends TestCase //Check message was correctly enqueued List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId); - assertNotNull(data); + assertNotNull("Message has no enqueued information.", data); + assertTrue("Message is not enqueued on correct queue.", data.contains(_queue)); + assertEquals("Message not enqueued on the right queues.", 1, data.size()); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index abcd9855d9..3a4746eb2c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -39,7 +39,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockProtocolSession; -import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; public class ACLManagerTest extends TestCase @@ -66,7 +66,7 @@ public class ACLManagerTest extends TestCase _pluginManager = new MockPluginManager(""); _authzManager = new ACLManager(_conf, _pluginManager); - _session = new MockProtocolSession(new TestableMemoryMessageStore()); + _session = new MockProtocolSession(new MemoryMessageStore().configure()); } public void tearDown() throws Exception diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index ff1fb8c97d..251f6d45f7 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -37,7 +37,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.TestIoSession; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; -import org.apache.qpid.server.store.TestableMemoryMessageStore; + import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -81,14 +81,12 @@ public class FirewallPluginTest extends TestCase } } - private TestableMemoryMessageStore _store; private VirtualHost _virtualHost; private AMQMinaProtocolSession _session; @Override public void setUp() throws Exception { - _store = new TestableMemoryMessageStore(); PropertiesConfiguration env = new PropertiesConfiguration(); _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); TestIoSession iosession = new TestIoSession(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 5a4c435e59..4c03a57cc8 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -26,28 +26,24 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.server.queue.MessageFactory; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.MessageFactory; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; -/** - * Tests that reference counting works correctly with AMQMessage and the message store - */ +/** Tests that reference counting works correctly with AMQMessage and the message store */ public class TestReferenceCounting extends TestCase { - private TestableMemoryMessageStore _store; + private TestableTransactionLog _store; private StoreContext _storeContext = new StoreContext(); - protected void setUp() throws Exception { super.setUp(); - _store = new TestableMemoryMessageStore(); + _store = new TestableTransactionLog(new TestableMemoryMessageStore().configure()); } - /** - * Check that when the reference count is decremented the message removes itself from the store - */ + /** Check that when the reference count is decremented the message removes itself from the store */ public void testMessageGetsRemoved() throws AMQException { ContentHeaderBody chb = createPersistentContentHeader(); @@ -57,14 +53,15 @@ public class TestReferenceCounting extends TestCase AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - assertEquals(1, _store.getMessageMetaDataMap().size()); + assertNotNull("Message Metadata did not exist for new message", + _store.getMessageMetaData(new StoreContext(), message.getMessageId())); } private ContentHeaderBody createPersistentContentHeader() { ContentHeaderBody chb = new ContentHeaderBody(); BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); - bchp.setDeliveryMode((byte)2); + bchp.setDeliveryMode((byte) 2); chb.properties = bchp; return chb; } @@ -77,8 +74,9 @@ public class TestReferenceCounting extends TestCase final ContentHeaderBody chb = createPersistentContentHeader(); AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true); message.setPublishAndContentHeaderBody(_storeContext, info, chb); - - assertEquals(1, _store.getMessageMetaDataMap().size()); + + assertNotNull("Message Metadata did not exist for new message", + _store.getMessageMetaData(new StoreContext(), message.getMessageId())); } public static junit.framework.Test suite() diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java index 38d139e94c..5d0fdfb727 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java @@ -21,12 +21,23 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.BaseTransactionLog; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; import java.util.Map; import java.util.List; public interface TestTransactionLog extends TransactionLog { + public void setBaseTransactionLog(BaseTransactionLog base); + public List<AMQQueue> getMessageReferenceMap(Long messageID); + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; + public long getMessageMetaDataSize(); + public TransactionLog getDelegate(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index fa5cdc1aa5..2099181a76 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -20,192 +20,80 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.transactionlog.BaseTransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentMap; /** Adds some extra methods to the memory message store for testing purposes. */ -public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable +public class TestableMemoryMessageStore extends MemoryMessageStore implements TestTransactionLog { - private TransactionLog _transactionLog; - private RoutingTable _routingTable; - private MemoryMessageStore _mms; + private TestableTransactionLog _base; - public TestableMemoryMessageStore(TransactionLog log) + public void setBaseTransactionLog(BaseTransactionLog base) { - _transactionLog = log; - if (log instanceof BaseTransactionLog) + if (!(base instanceof TestableTransactionLog)) { - TransactionLog delegate = ((BaseTransactionLog) log).getDelegate(); - if (delegate instanceof RoutingTable) - { - _routingTable = (RoutingTable) delegate; - } - else - { - throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log); - } - - if (delegate instanceof MemoryMessageStore) - { - _mms = (MemoryMessageStore) delegate; - } - - } - else - { - throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log); + throw new RuntimeException("base must be a TestableTransactionLog for correct operation in a TestMemoryMessageStore"); } + _base = (TestableTransactionLog) base; } - public TestableMemoryMessageStore(MemoryMessageStore mms) - { - _routingTable = mms; - _transactionLog = mms.configure(); - } - - public TestableMemoryMessageStore() - { - _mms = new MemoryMessageStore(); - _transactionLog = _mms.configure(); - _routingTable = _mms; - } - - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() - { - return ((MemoryMessageStore) _routingTable)._metaDataMap; - } - - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - return ((MemoryMessageStore) _routingTable)._contentBodyMap; - } - - public List<AMQQueue> getMessageReferenceMap(Long messageId) - { -// return _mms._messageEnqueueMap.get(messageId); -// ((BaseTransactionLog)_transactionLog). - return new ArrayList<AMQQueue>(); - } - - public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + @Override + public TransactionLog configure() { - _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config); - return _transactionLog; - } + BaseTransactionLog base = (BaseTransactionLog) super.configure(); - public void close() throws Exception - { - _transactionLog.close(); - _routingTable.close(); - } + _base = new TestableTransactionLog(base.getDelegate()); - public void createExchange(Exchange exchange) throws AMQException - { - _routingTable.createExchange(exchange); + return _base; } - public void removeExchange(Exchange exchange) throws AMQException + @Override + public TransactionLog configure(String base, VirtualHostConfiguration config) { - _routingTable.removeExchange(exchange); - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - _routingTable.bindQueue(exchange, routingKey, queue, args); - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - _routingTable.unbindQueue(exchange, routingKey, queue, args); - } - - public void createQueue(AMQQueue queue) throws AMQException - { - _routingTable.createQueue(queue); - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException - { - _routingTable.createQueue(queue, arguments); - } - - public void removeQueue(AMQQueue queue) throws AMQException - { - _routingTable.removeQueue(queue); - } - - public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException - { - _transactionLog.enqueueMessage(context, queues, messageId); - } - - public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException - { - _transactionLog.dequeueMessage(context, queue, messageId); - } - - public void removeMessage(StoreContext context, Long messageId) throws AMQException - { - _transactionLog.removeMessage(context, messageId); - } - - public void beginTran(StoreContext context) throws AMQException - { - _transactionLog.beginTran(context); - } + //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable' + if (base.equals("store")) + { + super.configure(); - public void commitTran(StoreContext context) throws AMQException - { - _transactionLog.commitTran(context); - } + _base = new TestableTransactionLog(this); - public void abortTran(StoreContext context) throws AMQException - { - _transactionLog.abortTran(context); - } + return _base; + } - public boolean inTran(StoreContext context) - { - return _transactionLog.inTran(context); + return super.configure(); } - public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException + public List<AMQQueue> getMessageReferenceMap(Long messageId) { - _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody); + return _base.getMessageReferenceMap(messageId); } - public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) { - _transactionLog.storeMessageMetaData(context, messageId, messageMetaData); + return _metaDataMap.get(messageId); } - public boolean isPersistent() + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) { - return _transactionLog.isPersistent(); + List<ContentChunk> bodyList = _contentBodyMap.get(messageId); + return bodyList.get(index); } - public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + public long getMessageMetaDataSize() { - return _mms.getMessageMetaData(context, messageId); + return _metaDataMap.size(); } - public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + public TransactionLog getDelegate() { - return _mms.getContentBodyChunk(context, messageId, index); + return _base; } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java index 0a2a1c2327..a0c38ff0ad 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockContentChunk; import org.apache.qpid.server.queue.MockPersistentAMQMessage; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; @@ -47,14 +48,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>(); final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>(); - TestableTransactionLog _transactionLog; + TestTransactionLog _transactionLog; private ArrayList<AMQQueue> _queues; private MockPersistentAMQMessage _message; public void setUp() throws Exception { super.setUp(); - _transactionLog = new TestableTransactionLog(this); + _transactionLog = new TestableBaseTransactionLog(this); } public void testSingleEnqueueNoTransactional() throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java new file mode 100644 index 0000000000..92bc44da0b --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.List; + +public class TestableBaseTransactionLog extends BaseTransactionLog implements TestTransactionLog +{ + + public TestableBaseTransactionLog() + { + super(null); + } + + public TestableBaseTransactionLog(TransactionLog delegate) + { + super(delegate); + if (delegate instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog) delegate).getDelegate(); + } + + } + + @Override + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + if (_delegate != null) + { + TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config); + + // Unwrap any BaseTransactionLog + if (configuredLog instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog) configuredLog).getDelegate(); + } + } + else + { + String delegateClass = config.getStoreConfiguration().getString("delegate"); + Class clazz = Class.forName(delegateClass); + Object o = clazz.newInstance(); + + if (!(o instanceof TransactionLog)) + { + throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + + " does not."); + } + _delegate = (TransactionLog) o; + + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _delegate.configure(virtualHost, base, config); + } + return this; + } + + public void setBaseTransactionLog(BaseTransactionLog base) + { + throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs"); + } + + public List<AMQQueue> getMessageReferenceMap(Long messageID) + { + return _idToQueues.get(messageID); + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId); + } + else + { + return null; + } + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index); + } + else + { + return null; + } + } + + public long getMessageMetaDataSize() + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaDataSize(); + } + else + { + return 0; + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java index b0c47052b2..38e17c3a07 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java +++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java @@ -20,52 +20,95 @@ */ package org.apache.qpid.server.transactionlog; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.LinkedList; +import java.util.Map; public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog { - - List<Long> _singleEnqueues = new LinkedList<Long>(); + protected Map<Long, List<AMQQueue>> _singleEnqueuedIDstoQueue = new HashMap<Long, List<AMQQueue>>(); public TestableTransactionLog() { super(null); } - public TestableTransactionLog(BaseTransactionLog delegate) + public TestableTransactionLog(TransactionLog delegate) { - super(delegate.getDelegate()); + super(delegate); + if (delegate instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog) delegate).getDelegate(); + } + } - public TestableTransactionLog(TransactionLog delegate) + /** + * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting + * + * @param context The transactional context for the operation. + * @param queues + * @param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException + * + * @throws AMQException + */ + @Override + public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException { - super(delegate); + if (queues.size() == 1) + { + _singleEnqueuedIDstoQueue.put(messageId, queues); + } + + super.enqueueMessage(context, queues, messageId); } + /** + * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting + * + * @param context The transactional context for the operation. + * @param queue + * @param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException + * + * @throws AMQException + */ + @Override + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + { + if (_singleEnqueuedIDstoQueue.containsKey(messageId)) + { + _singleEnqueuedIDstoQueue.remove(messageId); + } + + super.dequeueMessage(context, queue, messageId); + } @Override public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { if (_delegate != null) { - TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config); + TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config); // Unwrap any BaseTransactionLog if (configuredLog instanceof BaseTransactionLog) { - _delegate = ((BaseTransactionLog)configuredLog).getDelegate(); + _delegate = ((BaseTransactionLog) configuredLog).getDelegate(); } } else { - String delegateClass = config.getStoreConfiguration().getString("delegate"); + String delegateClass = config.getStoreConfiguration().getString("delegate"); Class clazz = Class.forName(delegateClass); Object o = clazz.newInstance(); @@ -77,13 +120,61 @@ public class TestableTransactionLog extends BaseTransactionLog implements TestTr _delegate = (TransactionLog) o; // If a TransactionLog uses the BaseTransactionLog then it will return this object. - _delegate.configure(virtualHost, base, config); + _delegate.configure(virtualHost, base, config); } return this; } + public void setBaseTransactionLog(BaseTransactionLog base) + { + throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs"); + } + public List<AMQQueue> getMessageReferenceMap(Long messageID) { - return _idToQueues.get(messageID); + List<AMQQueue> result = _idToQueues.get(messageID); + + if (result == null) + { + result = _singleEnqueuedIDstoQueue.get(messageID); + } + + return result; + } + + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId); + } + else + { + return null; + } + } + + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index); + } + else + { + return null; + } + } + + public long getMessageMetaDataSize() + { + if (_delegate instanceof TestTransactionLog) + { + return ((TestTransactionLog) _delegate).getMessageMetaDataSize(); + } + else + { + return 0; + } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index cdc7eabf04..dbd05b9598 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -34,8 +34,10 @@ import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.TestTransactionLog; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -62,7 +64,11 @@ public class InternalBrokerBaseCase extends TestCase { super.setUp(); PropertiesConfiguration configuration = new PropertiesConfiguration(); - configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); + // This configuration is not used as TestApplicationRegistry just creates a single vhost 'test' with + // TransactionLog TestableTransactionLog(TestMemoryMessageStore) + configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableTransactionLog.class.getName()); + configuration.setProperty("virtualhosts.virtualhost.test.store.delegate", TestableMemoryMessageStore.class.getName()); + _registry = new TestApplicationRegistry(new ServerConfiguration(configuration)); ApplicationRegistry.initialise(_registry); _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); @@ -96,7 +102,7 @@ public class InternalBrokerBaseCase extends TestCase protected void checkStoreContents(int messageCount) { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size()); + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestTransactionLog) _transactionLog).getMessageMetaDataSize()); //The above publish message is sufficiently small not to fit in the header so no Body is required. //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 22bd3b5aab..8c2508b8f4 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.util; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -38,9 +37,9 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.transactionlog.TestableTransactionLog; import java.util.Collection; -import java.util.HashMap; import java.util.Properties; import java.util.Arrays; @@ -83,7 +82,7 @@ public class TestApplicationRegistry extends ApplicationRegistry _managedObjectRegistry = new NoopManagedObjectRegistry(); - _transactionLog = new TestableMemoryMessageStore(); + _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure()); _virtualHostRegistry = new VirtualHostRegistry(); |
