summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java65
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java77
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java120
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java35
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java20
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (renamed from java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java)28
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java143
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java4
11 files changed, 284 insertions, 232 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 1d729a82a5..52b8b0ad19 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.queue.AMQMessage;
@@ -37,9 +38,10 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.TransientAMQMessage;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -81,20 +83,6 @@ public class TxAckTest extends TestCase
combined.stop();
}
- public void testPrepare() throws AMQException
- {
- individual.prepare();
- multiple.prepare();
- combined.prepare();
- }
-
- public void testUndoPrepare() throws AMQException
- {
- individual.undoPrepare();
- multiple.undoPrepare();
- combined.undoPrepare();
- }
-
public void testCommit() throws AMQException
{
individual.commit();
@@ -115,12 +103,13 @@ public class TxAckTest extends TestCase
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue;
+ private TransactionLog _transactionLog = new TestableMemoryMessageStore();
private static final int MESSAGE_SIZE=100;
Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
{
- TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
+ TransactionalContext txnContext = new NonTransactionalContext(_transactionLog,
_storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
@@ -138,12 +127,15 @@ public class TxAckTest extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- AMQMessage message = new TestMessage(deliveryTag, info);
+ AMQMessage message = new TestMessage(deliveryTag, info, (TestTransactionLog) _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
message.setPublishAndContentHeaderBody(_storeContext, info, header);
+
+
+
_map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
}
_acked = acked;
@@ -165,25 +157,6 @@ public class TxAckTest extends TestCase
}
}
- void prepare() throws AMQException
- {
- _op.consolidate();
- _op.prepare(_storeContext);
-
- assertCount(_acked, -1);
- assertCount(_unacked, 0);
-
- }
-
- void undoPrepare()
- {
- _op.consolidate();
- _op.undoPrepare();
-
- assertCount(_acked, 1);
- assertCount(_unacked, 0);
- }
-
void commit()
{
_op.consolidate();
@@ -232,30 +205,22 @@ public class TxAckTest extends TestCase
private class TestMessage extends TransientAMQMessage
{
private final long _tag;
- private int _count;
+ private TestTransactionLog _transactionLog;
- TestMessage(long tag, MessagePublishInfo publishBody)
+ public TestMessage(long tag, MessagePublishInfo publishBody, TestTransactionLog transactionLog)
throws AMQException
{
super(createMessage( publishBody));
_tag = tag;
+ _transactionLog = transactionLog;
}
- public boolean incrementReference(int count)
- {
- _count+=count;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
-
void assertCountEquals(int expected)
{
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ List<AMQQueue> list = _transactionLog.getMessageReferenceMap(_messageId);
+ int actual = (list == null ? 0 : list.size());
+ assertEquals("Wrong count for message with tag " + _tag, expected, actual);
}
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 40b08a2e39..78cf610f28 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -324,7 +324,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 98465eda20..9f8d5f9a99 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -30,14 +30,16 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -57,7 +59,7 @@ public class AckTest extends TestCase
private MockProtocolSession _protocolSession;
- private TestMemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
@@ -72,14 +74,15 @@ public class AckTest extends TestCase
super.setUp();
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
- _messageStore = new TestMemoryMessageStore();
+ VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = new TestableMemoryMessageStore((MemoryMessageStore)vhost.getTransactionLog());
_protocolSession = new MockProtocolSession(_messageStore);
_channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
- _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"),
- null);
+ _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"),
+ true, vhost, null);
}
protected void tearDown()
@@ -185,7 +188,7 @@ public class AckTest extends TestCase
/**
* Tests that in no-ack mode no messages are retained
*/
- public void testPersistentNoAckMode() throws AMQException
+ public void testPersistentNoAckMode() throws AMQException, InterruptedException
{
// false arg means no acks expected
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
@@ -194,7 +197,7 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
+ assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
assertTrue(_messageStore.getContentBodyMap().size() == 0);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
deleted file mode 100644
index 44e9851db7..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import junit.framework.TestCase;
-
-public class MessageReferenceCountingTest extends TestCase
-{
- AMQMessage _message;
-
- public void setUp()
- {
- _message = MessageFactory.getInstance().createMessage(null, false);
- }
-
- public void testInitialState()
- {
-
- assertTrue("New messages should have a reference", _message.isReferenced());
- }
-
- public void testIncrementReference()
- {
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- }
-
- public void testDecrementReference()
- {
- assertTrue("Message should maintain Referenced state", _message.isReferenced());
- try
- {
- _message.decrementReference(null);
- }
- catch (MessageCleanupException e)
- {
- fail("Decrement should be allowed:"+e.getMessage());
- }
-
- assertFalse("Message should not be Referenced state", _message.isReferenced());
-
- try
- {
- _message.decrementReference(null);
- fail("Decrement should not be allowed as we should have a ref count of 0");
- }
- catch (MessageCleanupException e)
- {
- assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
- }
-
- }
-
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 12ff91cdad..92235648ec 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -38,4 +38,9 @@ public class MockQueueEntry extends QueueEntryImpl
{
super(_defaultList, message);
}
+
+ public MockQueueEntry(AMQMessage message, SimpleAMQQueue queue)
+ {
+ super(new SimpleQueueEntryList(queue) ,message);
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index fdaf2c309f..4551ae5af8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -20,18 +20,46 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
public class PersistentMessageTest extends TransientMessageTest
{
- private MemoryMessageStore _messageStore;
+ private TestableMemoryMessageStore _messageStore;
+
+ protected SimpleAMQQueue _queue;
+ protected AMQShortString _q1name = new AMQShortString("q1name");
+ protected AMQShortString _owner = new AMQShortString("owner");
+ protected AMQShortString _routingKey = new AMQShortString("routing key");
+ private TransactionalContext _messageDeliveryContext;
+ private static final long MESSAGE_SIZE = 0L;
+ private List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
- public void setUp()
+ public void setUp() throws Exception
{
- _messageStore = new MemoryMessageStore();
- _messageStore.configure();
+ _messageStore = new TestableMemoryMessageStore();
+
_storeContext = new StoreContext();
+ VirtualHost vhost = new VirtualHost(PersistentMessageTest.class.getName(), _messageStore);
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
+ // Create IncomingMessage and nondurable queue
+ _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+
}
@Override
@@ -47,4 +75,86 @@ public class PersistentMessageTest extends TransientMessageTest
assertTrue(_message.isPersistent());
}
+ /**
+ * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
+ * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
+ * contents. TransactionLog = Empty, returnMessages 1 item.
+ *
+ * @throws Exception
+ */
+ public void testImmediateReturnNotInLog() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl(null, true, false, null);
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_messageStore);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it was not enqueued
+ List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+ assertNull("TransactionLog contains a queue reference for this messageID:" + messageId, queueList);
+ checkMessageMetaDataRemoved(messageId);
+
+ assertEquals("Return message count not correct", 1, _returnMessages.size());
+ }
+
+ protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+ new MockProtocolSession(_messageStore), _messageStore);
+
+ // equivalent to amqChannel.publishContenHeader
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ // This message has no bodies
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+ msg.setContentHeaderBody(contentHeaderBody);
+ msg.setExpiration();
+
+ return msg;
+ }
+
+ protected void checkMessageMetaDataExists(long messageId)
+ {
+ try
+ {
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ }
+ catch (AMQException amqe)
+ {
+ fail("Message MetaData does not exist for message:" + messageId);
+ }
+ }
+
+ protected void checkMessageMetaDataRemoved(long messageId)
+ {
+ try
+ {
+ assertNull("Message MetaData still exists for message:" + messageId,
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+ assertNull("Message still has values in the reference map:" + messageId,
+ _messageStore.getMessageReferenceMap(messageId));
+
+ }
+ catch (AMQException e)
+ {
+ fail("AMQE thrown whilst trying to getMessageMetaData:" + e.getMessage());
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 665ca089da..7a97837208 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -36,10 +36,12 @@ import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -319,7 +321,7 @@ public class SimpleAMQQueueTest extends TestCase
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, null, _store);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -338,21 +340,22 @@ public class SimpleAMQQueueTest extends TestCase
_store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- AMQQueue data = _store.getMessages().get(messageId);
+ List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
-
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
- MockQueueEntry entry = new MockQueueEntry(message);
- _queue.dequeue(null, entry);
+ MockQueueEntry entry = new MockQueueEntry(message, _queue);
+ entry.getQueueEntryList().add(message);
+ entry.acquire();
+ entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessages().get(messageId);
+ data = _store.getMessageReferenceMap(messageId);
assertNull(data);
}
@@ -381,7 +384,7 @@ public class SimpleAMQQueueTest extends TestCase
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info);
+ AMQMessage message = new TestMessage(info, _store);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -397,29 +400,21 @@ public class SimpleAMQQueueTest extends TestCase
public class TestMessage extends TransientAMQMessage
{
private final long _tag;
- private int _count;
+ private TestTransactionLog _transactionLog;
- TestMessage(MessagePublishInfo publishBody)
+ TestMessage(MessagePublishInfo publishBody, TestTransactionLog transactionLog)
throws AMQException
{
super(SimpleAMQQueueTest.createMessage(publishBody));
_tag = getMessageId();
+ _transactionLog = transactionLog;
}
- public boolean incrementReference(int count)
- {
- _count+=count;
- return true;
- }
-
- public void decrementReference(StoreContext context)
- {
- _count--;
- }
void assertCountEquals(int expected)
{
- assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ assertEquals("Wrong count for message with tag " + _tag, expected,
+ _transactionLog.getMessageReferenceMap(_messageId).size());
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index e8acfc2fda..5a4c435e59 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -34,7 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage;
*/
public class TestReferenceCounting extends TestCase
{
- private TestMemoryMessageStore _store;
+ private TestableMemoryMessageStore _store;
private StoreContext _storeContext = new StoreContext();
@@ -42,7 +42,7 @@ public class TestReferenceCounting extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _store = new TestMemoryMessageStore();
+ _store = new TestableMemoryMessageStore();
}
/**
@@ -54,19 +54,9 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- final long messageId = _store.getNewMessageId();
-
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message.incrementReference(1);
-
- // we call routing complete to set up the handle
- // message.routingComplete(_store, _storeContext, new MessageHandleFactory());
-
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
@@ -84,16 +74,10 @@ public class TestReferenceCounting extends TestCase
MessagePublishInfo info = new MessagePublishInfoImpl();
- final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message.incrementReference(1);
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.incrementReference(1);
- message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
index 4e48435962..bb051693c3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
@@ -20,32 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
import java.util.List;
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestMemoryMessageStore extends MemoryMessageStore
+public interface TestTransactionLog
{
- public TestMemoryMessageStore()
- {
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
- }
-
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
- {
- return _metaDataMap;
- }
-
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- return _contentBodyMap;
- }
+ public List<AMQQueue> getMessageReferenceMap(Long messageID);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9146fe88ae..882f88b8f3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -23,22 +23,28 @@ package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
-public class TestableMemoryMessageStore extends MemoryMessageStore
+public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
{
MemoryMessageStore _mms = null;
- private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -47,46 +53,127 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ _mms = new MemoryMessageStore();
+ _mms.configure();
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
{
- if (_mms != null)
- {
- return _mms._metaDataMap;
- }
- else
- {
- return _metaDataMap;
- }
+ return _mms._metaDataMap;
}
public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
- if (_mms != null)
- {
- return _mms._contentBodyMap;
- }
- else
- {
- return _contentBodyMap;
- }
+ return _mms._contentBodyMap;
}
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageId)
+ {
+ return _mms._messageEnqueueMap.get(messageId);
+ }
+
+ public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ {
+ _mms.configure(virtualHost,base,config);
+ }
+
+ public void close() throws Exception
+ {
+ _mms.close();
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ _mms.createExchange(exchange);
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ _mms.removeExchange(exchange);
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ _mms.bindQueue(exchange,routingKey,queue,args);
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ _mms.unbindQueue(exchange,routingKey,queue,args);
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ _mms.createQueue(queue);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+ {
+ _mms.createQueue(queue,arguments);
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ _mms.removeQueue(queue);
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ {
+ _mms.removeMessage(storeContext, messageId);
+ }
+
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ _mms.enqueueMessage(context,queue,messageId);
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
+ _mms.dequeueMessage(context,queue,messageId);
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ _mms.beginTran(context);
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ _mms.commitTran(context);
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ _mms.abortTran(context);
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return _mms.inTran(context);
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ {
+ _mms.storeContentBodyChunk(context,messageId,index,contentBody,lastContentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ {
+ _mms.storeMessageMetaData(context,messageId,messageMetaData);
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
- getMessages().put(messageId, queue);
+ return _mms.getMessageMetaData(context,messageId);
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- getMessages().remove(messageId);
+ return _mms.getContentBodyChunk(context,messageId,index);
}
- public HashMap<Long, AMQQueue> getMessages()
+ public boolean isPersistent()
{
- return _messages;
+ return _mms.isPersistent();
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index ca6644d141..26802b4210 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.txn;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.LinkedList;
@@ -194,7 +194,7 @@ public class TxnBufferTest extends TestCase
}
}
- class MockStore extends TestMemoryMessageStore
+ class MockStore extends TestableMemoryMessageStore
{
final Object BEGIN = "BEGIN";
final Object ABORT = "ABORT";