summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-03 13:26:55 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-03 13:26:55 +0000
commitd62a09ce7b5e3c7a66ec85c4064d40e012552b0c (patch)
tree1364a8757dddef44ad897ebb61dfed30993722eb /java/broker/src/test
parent7c604ebe2d81a733df8a25a7c2b330e1556d3013 (diff)
downloadqpid-python-d62a09ce7b5e3c7a66ec85c4064d40e012552b0c.tar.gz
QPID-1764 : Update to BaseTransactionLog to create a TestableTransactionLog, which will replace TestableMessageStore. Update to BaseTransactionLog/Test to work correctly with transactions and to fully test that functionality. Updated StoreContext to know when it is in a transaction as relying on a payload being set is not sufficient as that is not set when running with the MessageMemoryStore and so transactional testing in the BTLT was not correct.
Update to Virtualhost to correctly set the RoutingTable when the specified TransactionLog is wrapped in a BaseTransactionLog. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@761670 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java1
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java177
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java89
4 files changed, 217 insertions, 53 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index d4b1de29b2..d5e873ebc0 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -390,6 +390,7 @@ public class SimpleAMQQueueTest extends TestCase
{
sendMessage(txnContext);
+ // This check may be too soon as a purging thread may be required to bring the queue back under quota.
long usage = _queue.getMemoryUsageCurrent();
assertTrue("Queue has gone over quota:" + usage,
usage <= _queue.getMemoryUsageMaximum());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
index bb051693c3..38d139e94c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
@@ -21,11 +21,12 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.Map;
import java.util.List;
-public interface TestTransactionLog
+public interface TestTransactionLog extends TransactionLog
{
public List<AMQQueue> getMessageReferenceMap(Long messageID);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
index d3294d4c10..0a2a1c2327 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class BaseTransactionLogTest extends TestCase implements TransactionLog
@@ -46,14 +47,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
- BaseTransactionLog _transactionLog;
+ TestableTransactionLog _transactionLog;
private ArrayList<AMQQueue> _queues;
private MockPersistentAMQMessage _message;
public void setUp() throws Exception
{
super.setUp();
- _transactionLog = new BaseTransactionLog(this);
+ _transactionLog = new TestableTransactionLog(this);
}
public void testSingleEnqueueNoTransactional() throws AMQException
@@ -87,11 +88,9 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
// Enqueue a message to dequeue
testSingleEnqueueNoTransactional();
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
public void testSingleEnqueueTransactional() throws AMQException
@@ -137,16 +136,13 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
_transactionLog.beginTran(context);
- _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
_transactionLog.commitTran(context);
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
-
public void testMultipleEnqueueNoTransactional() throws AMQException
{
//Store Data
@@ -185,34 +181,54 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
// Enqueue a message to dequeue
testMultipleEnqueueNoTransactional();
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
- assertNotNull("Message not enqueued", enqueued);
- assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0)));
- assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
- assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
- assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ _queues.remove(0);
+
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ verifyMessageStored(_message.getMessageId());
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId());
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ _queues.remove(0);
- enqueued = _enqueues.get(_message.getMessageId());
- assertNotNull("Message not enqueued", enqueued);
- assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1)));
- assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
-
- assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
- assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+ ArrayList<AMQQueue> enqueues = _enqueues.get(_message.getMessageId());
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId());
+ assertNotNull("Message not enqueued", enqueues);
+ assertEquals("Message is not enqueued on the right number of queues", _queues.size(), enqueues.size());
+ for (AMQQueue queue : _queues)
+ {
+ assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
+ }
+
+ //Use the reference map to ensure that we are enqueuing the right number of messages
+ List<AMQQueue> references = _transactionLog.getMessageReferenceMap(_message.getMessageId());
+
+ assertNotNull("Message not enqueued", references);
+ assertEquals("Message is not enqueued on the right number of queues", _queues.size(), references.size());
+ for (AMQQueue queue : references)
+ {
+ assertTrue("Message not enqueued on:" + queue, references.contains(queue));
+ }
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageStored(_message.getMessageId());
+
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+
+ verifyMessageRemoved(_message.getMessageId());
}
+ private void verifyMessageRemoved(Long messageID)
+ {
+ assertNull("Message references exist", _transactionLog.getMessageReferenceMap(messageID));
+ assertNull("Message enqueued", _enqueues.get(messageID));
+ assertNull("Message chunks enqueued", _storeChunks.get(messageID));
+ assertNull("Message meta data enqueued", _storeMetaData.get(messageID));
+ }
public void testMultipleEnqueueTransactional() throws AMQException
{
@@ -294,12 +310,10 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
_transactionLog.commitTran(context);
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
- public void testMultipleDequeueSingleTransaction() throws AMQException
+ public void testMultipleDequeueSingleTransaction() throws AMQException
{
// Enqueue a message to dequeue
testMultipleEnqueueTransactional();
@@ -318,10 +332,8 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
-
_transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
-
enqueued = _enqueues.get(_message.getMessageId());
assertNotNull("Message not enqueued", enqueued);
assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
@@ -330,14 +342,11 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
-
_transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
_transactionLog.commitTran(context);
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
private void verifyMessageStored(Long messageId)
@@ -356,6 +365,23 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
{
assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
}
+
+ //Use the reference map to ensure that we are enqueuing the right number of messages
+ List<AMQQueue> references = _transactionLog.getMessageReferenceMap(messageId);
+
+ if (queues.size() == 1)
+ {
+ assertNull("Message has an enqueued list", references);
+ }
+ else
+ {
+ assertNotNull("Message not enqueued", references);
+ assertEquals("Message is not enqueued on the right number of queues", queues.size(), references.size());
+ for (AMQQueue queue : references)
+ {
+ assertTrue("Message not enqueued on:" + queue, references.contains(queue));
+ }
+ }
}
/*************************** TransactionLog *******************************
@@ -419,19 +445,42 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
if (queues == null)
{
- throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
- "queue(" + queue + ") but no enqueue data available");
- }
+ boolean found = false;
+ // If we are in a transaction we may have already done the dequeue.
+ if (context.inTransaction())
+ {
- synchronized (queues)
- {
- if (!queues.contains(queue))
+ for (Object record : (ArrayList) context.getPayload())
+ {
+ if (record instanceof RemoveRecord)
+ {
+ if (((RemoveRecord) record)._messageId.equals(messageId))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!found)
{
throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
- "queue(" + queue + ") but no message not enqueued on queue");
+ "queue(" + queue + ") but no enqueue data available");
+ }
+ }
+ else
+ {
+ synchronized (queues)
+ {
+ if (!queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+ "queue(" + queue + ") but no message not enqueued on queue");
+ }
+
+ queues.remove(queue);
}
-
- queues.remove(queue);
}
}
@@ -450,21 +499,29 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
"no enqueue data available");
}
- if (!queues.isEmpty())
+ if (queues.size() > 1)
{
throw new RuntimeException("Removed a message(" + messageId + ") that still had references.");
}
+ MessageMetaData mmd;
synchronized (_storeMetaData)
{
- _storeMetaData.remove(messageId);
+ mmd = _storeMetaData.remove(messageId);
}
+ ArrayList<ContentChunk> chunks;
synchronized (_storeChunks)
{
- _storeChunks.remove(messageId);
+ chunks = _storeChunks.remove(messageId);
}
+ //Record the remove for part of the transaction
+ if (context.inTransaction())
+ {
+ ArrayList transactionData = (ArrayList) context.getPayload();
+ transactionData.add(new RemoveRecord(messageId, queues, mmd, chunks));
+ }
}
//
@@ -474,7 +531,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
public void beginTran(StoreContext context) throws AMQException
{
- context.setPayload(new Object());
+ context.setPayload(new ArrayList());
}
public void commitTran(StoreContext context) throws AMQException
@@ -532,4 +589,20 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
{
return false;
}
+
+ class RemoveRecord
+ {
+ MessageMetaData _mmd;
+ ArrayList<AMQQueue> _queues;
+ ArrayList<ContentChunk> _chunks;
+ Long _messageId;
+
+ RemoveRecord(Long messageId, ArrayList<AMQQueue> queues, MessageMetaData mmd, ArrayList<ContentChunk> chunks)
+ {
+ _messageId = messageId;
+ _queues = queues;
+ _mmd = mmd;
+ _chunks = chunks;
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
new file mode 100644
index 0000000000..b0c47052b2
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.routing.RoutingTable;
+
+import java.util.List;
+import java.util.LinkedList;
+
+public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
+{
+
+ List<Long> _singleEnqueues = new LinkedList<Long>();
+
+ public TestableTransactionLog()
+ {
+ super(null);
+ }
+
+ public TestableTransactionLog(BaseTransactionLog delegate)
+ {
+ super(delegate.getDelegate());
+ }
+
+ public TestableTransactionLog(TransactionLog delegate)
+ {
+ super(delegate);
+ }
+
+
+ @Override
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ if (_delegate != null)
+ {
+ TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+
+ // Unwrap any BaseTransactionLog
+ if (configuredLog instanceof BaseTransactionLog)
+ {
+ _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+ }
+ }
+ else
+ {
+ String delegateClass = config.getStoreConfiguration().getString("delegate");
+ Class clazz = Class.forName(delegateClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof TransactionLog))
+ {
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
+ " does not.");
+ }
+ _delegate = (TransactionLog) o;
+
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _delegate.configure(virtualHost, base, config);
+ }
+ return this;
+ }
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageID)
+ {
+ return _idToQueues.get(messageID);
+ }
+}