summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java177
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java89
7 files changed, 261 insertions, 88 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index b5ae8ea284..bb50df139f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -44,6 +44,7 @@ public class StoreContext
private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
private boolean _async;
+ private boolean _inTransaction;
public StoreContext()
{
@@ -64,6 +65,9 @@ public class StoreContext
{
_name = name;
_async = asynchrouous;
+ _inTransaction = false;
+ _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
}
public StoreContext(boolean asynchronous)
@@ -82,7 +86,7 @@ public class StoreContext
{
_logger.debug("public void setPayload(Object payload = " + payload + "): called");
}
- _payload = payload;
+ _payload = payload;
}
/**
@@ -137,7 +141,7 @@ public class StoreContext
}
/**
- * Record the dequeue for processing on commit
+ * Record the dequeue for processing after the commit
*
* @param queue
* @param messageId
@@ -146,39 +150,37 @@ public class StoreContext
*/
public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
{
- if (inTransaction())
- {
- ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+ ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
- if (dequeues == null)
- {
- dequeues = new ArrayList<AMQQueue>();
- _dequeueMap.put(messageId, dequeues);
- }
-
- dequeues.add(queue);
+ if (dequeues == null)
+ {
+ dequeues = new ArrayList<AMQQueue>();
+ _dequeueMap.put(messageId, dequeues);
}
+
+ dequeues.add(queue);
}
public void beginTransaction() throws AMQException
{
- _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
- _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _inTransaction = true;
}
public void commitTransaction() throws AMQException
{
_dequeueMap.clear();
+ _inTransaction = false;
}
public void abortTransaction() throws AMQException
{
_enqueueMap.clear();
+ _inTransaction = false;
}
public boolean inTransaction()
{
- return _payload != null;
+ return _inTransaction; // _payload != null;
}
public boolean isAsync()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
index 4c3f1fcc49..973ecd6c09 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -39,7 +39,7 @@ public class BaseTransactionLog implements TransactionLog
private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
TransactionLog _delegate;
- private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+ protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
public BaseTransactionLog(TransactionLog delegate)
{
@@ -60,7 +60,7 @@ public class BaseTransactionLog implements TransactionLog
{
context.enqueueMessage(queues, messageId);
- if (queues.size() > 0)
+ if (queues.size() > 1)
{
_logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
@@ -73,10 +73,10 @@ public class BaseTransactionLog implements TransactionLog
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
+ context.dequeueMessage(queue, messageId);
+
if (context.inTransaction())
{
- context.dequeueMessage(queue, messageId);
-
Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
//For each Message ID that is in the map check
@@ -97,11 +97,7 @@ public class BaseTransactionLog implements TransactionLog
if (!context.inTransaction())
{
- HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>();
- ArrayList list = new ArrayList();
- list.add(queue);
- dequeue.put(messageId, list);
- processDequeues(dequeue);
+ processDequeues(context.getDequeueMap());
}
}
@@ -128,11 +124,7 @@ public class BaseTransactionLog implements TransactionLog
//Perform real commit of current data
_delegate.commitTran(context);
- // If we have dequeues to process then process them
- if (context.getDequeueMap() != null)
- {
- processDequeues(context.getDequeueMap());
- }
+ processDequeues(context.getDequeueMap());
//Commit the recorded state for this transaction.
context.commitTransaction();
@@ -141,10 +133,8 @@ public class BaseTransactionLog implements TransactionLog
public void abortTran(StoreContext context) throws AMQException
{
// If we have enqueues to rollback
- if (context.getEnqueueMap() != null)
- {
- processDequeues(context.getEnqueueMap());
- }
+ processDequeues(context.getEnqueueMap());
+
//Abort the recorded state for this transaction.
context.abortTransaction();
@@ -154,6 +144,12 @@ public class BaseTransactionLog implements TransactionLog
private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
throws AMQException
{
+ // Check we have dequeues to process then process them
+ if (messageMap == null || messageMap.isEmpty())
+ {
+ return;
+ }
+
// Process any enqueues to bring our model up to date.
Set<Long> messageIDs = messageMap.keySet();
@@ -190,6 +186,8 @@ public class BaseTransactionLog implements TransactionLog
if (enqueuedList.isEmpty())
{
_delegate.removeMessage(removeContext, messageID);
+ //Remove references list
+ _idToQueues.remove(messageID);
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 7bcfb9f59a..dc12d97557 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -295,14 +295,23 @@ public class VirtualHost implements Accessable
}
_transactionLog = (TransactionLog) o;
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
+
//Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable.
if (_transactionLog instanceof RoutingTable)
{
_routingTable = (RoutingTable) _transactionLog;
}
+ else if (_transactionLog instanceof BaseTransactionLog)
+ {
+ TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate();
+ if (delegate instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable) delegate;
+ }
+ }
- // If a TransactionLog uses the BaseTransactionLog then it will return this object.
- _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
}
//todo we need to move from store.class to transactionlog.class
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index d4b1de29b2..d5e873ebc0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -390,6 +390,7 @@ public class SimpleAMQQueueTest extends TestCase
{
sendMessage(txnContext);
+ // This check may be too soon as a purging thread may be required to bring the queue back under quota.
long usage = _queue.getMemoryUsageCurrent();
assertTrue("Queue has gone over quota:" + usage,
usage <= _queue.getMemoryUsageMaximum());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
index bb051693c3..38d139e94c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
@@ -21,11 +21,12 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.Map;
import java.util.List;
-public interface TestTransactionLog
+public interface TestTransactionLog extends TransactionLog
{
public List<AMQQueue> getMessageReferenceMap(Long messageID);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
index d3294d4c10..0a2a1c2327 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class BaseTransactionLogTest extends TestCase implements TransactionLog
@@ -46,14 +47,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
- BaseTransactionLog _transactionLog;
+ TestableTransactionLog _transactionLog;
private ArrayList<AMQQueue> _queues;
private MockPersistentAMQMessage _message;
public void setUp() throws Exception
{
super.setUp();
- _transactionLog = new BaseTransactionLog(this);
+ _transactionLog = new TestableTransactionLog(this);
}
public void testSingleEnqueueNoTransactional() throws AMQException
@@ -87,11 +88,9 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
// Enqueue a message to dequeue
testSingleEnqueueNoTransactional();
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
public void testSingleEnqueueTransactional() throws AMQException
@@ -137,16 +136,13 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
_transactionLog.beginTran(context);
- _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
_transactionLog.commitTran(context);
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
-
public void testMultipleEnqueueNoTransactional() throws AMQException
{
//Store Data
@@ -185,34 +181,54 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
// Enqueue a message to dequeue
testMultipleEnqueueNoTransactional();
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
- assertNotNull("Message not enqueued", enqueued);
- assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0)));
- assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
- assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
- assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ _queues.remove(0);
+
+ verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+ verifyMessageStored(_message.getMessageId());
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId());
+ assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+ _queues.remove(0);
- enqueued = _enqueues.get(_message.getMessageId());
- assertNotNull("Message not enqueued", enqueued);
- assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1)));
- assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
-
- assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
- assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+ ArrayList<AMQQueue> enqueues = _enqueues.get(_message.getMessageId());
- _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId());
+ assertNotNull("Message not enqueued", enqueues);
+ assertEquals("Message is not enqueued on the right number of queues", _queues.size(), enqueues.size());
+ for (AMQQueue queue : _queues)
+ {
+ assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
+ }
+
+ //Use the reference map to ensure that we are enqueuing the right number of messages
+ List<AMQQueue> references = _transactionLog.getMessageReferenceMap(_message.getMessageId());
+
+ assertNotNull("Message not enqueued", references);
+ assertEquals("Message is not enqueued on the right number of queues", _queues.size(), references.size());
+ for (AMQQueue queue : references)
+ {
+ assertTrue("Message not enqueued on:" + queue, references.contains(queue));
+ }
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageStored(_message.getMessageId());
+
+ _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+
+ verifyMessageRemoved(_message.getMessageId());
}
+ private void verifyMessageRemoved(Long messageID)
+ {
+ assertNull("Message references exist", _transactionLog.getMessageReferenceMap(messageID));
+ assertNull("Message enqueued", _enqueues.get(messageID));
+ assertNull("Message chunks enqueued", _storeChunks.get(messageID));
+ assertNull("Message meta data enqueued", _storeMetaData.get(messageID));
+ }
public void testMultipleEnqueueTransactional() throws AMQException
{
@@ -294,12 +310,10 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
_transactionLog.commitTran(context);
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
- public void testMultipleDequeueSingleTransaction() throws AMQException
+ public void testMultipleDequeueSingleTransaction() throws AMQException
{
// Enqueue a message to dequeue
testMultipleEnqueueTransactional();
@@ -318,10 +332,8 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
-
_transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
-
enqueued = _enqueues.get(_message.getMessageId());
assertNotNull("Message not enqueued", enqueued);
assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
@@ -330,14 +342,11 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
-
_transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
_transactionLog.commitTran(context);
- assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
- assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+ verifyMessageRemoved(_message.getMessageId());
}
private void verifyMessageStored(Long messageId)
@@ -356,6 +365,23 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
{
assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
}
+
+ //Use the reference map to ensure that we are enqueuing the right number of messages
+ List<AMQQueue> references = _transactionLog.getMessageReferenceMap(messageId);
+
+ if (queues.size() == 1)
+ {
+ assertNull("Message has an enqueued list", references);
+ }
+ else
+ {
+ assertNotNull("Message not enqueued", references);
+ assertEquals("Message is not enqueued on the right number of queues", queues.size(), references.size());
+ for (AMQQueue queue : references)
+ {
+ assertTrue("Message not enqueued on:" + queue, references.contains(queue));
+ }
+ }
}
/*************************** TransactionLog *******************************
@@ -419,19 +445,42 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
if (queues == null)
{
- throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
- "queue(" + queue + ") but no enqueue data available");
- }
+ boolean found = false;
+ // If we are in a transaction we may have already done the dequeue.
+ if (context.inTransaction())
+ {
- synchronized (queues)
- {
- if (!queues.contains(queue))
+ for (Object record : (ArrayList) context.getPayload())
+ {
+ if (record instanceof RemoveRecord)
+ {
+ if (((RemoveRecord) record)._messageId.equals(messageId))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!found)
{
throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
- "queue(" + queue + ") but no message not enqueued on queue");
+ "queue(" + queue + ") but no enqueue data available");
+ }
+ }
+ else
+ {
+ synchronized (queues)
+ {
+ if (!queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+ "queue(" + queue + ") but no message not enqueued on queue");
+ }
+
+ queues.remove(queue);
}
-
- queues.remove(queue);
}
}
@@ -450,21 +499,29 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
"no enqueue data available");
}
- if (!queues.isEmpty())
+ if (queues.size() > 1)
{
throw new RuntimeException("Removed a message(" + messageId + ") that still had references.");
}
+ MessageMetaData mmd;
synchronized (_storeMetaData)
{
- _storeMetaData.remove(messageId);
+ mmd = _storeMetaData.remove(messageId);
}
+ ArrayList<ContentChunk> chunks;
synchronized (_storeChunks)
{
- _storeChunks.remove(messageId);
+ chunks = _storeChunks.remove(messageId);
}
+ //Record the remove for part of the transaction
+ if (context.inTransaction())
+ {
+ ArrayList transactionData = (ArrayList) context.getPayload();
+ transactionData.add(new RemoveRecord(messageId, queues, mmd, chunks));
+ }
}
//
@@ -474,7 +531,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
public void beginTran(StoreContext context) throws AMQException
{
- context.setPayload(new Object());
+ context.setPayload(new ArrayList());
}
public void commitTran(StoreContext context) throws AMQException
@@ -532,4 +589,20 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
{
return false;
}
+
+ class RemoveRecord
+ {
+ MessageMetaData _mmd;
+ ArrayList<AMQQueue> _queues;
+ ArrayList<ContentChunk> _chunks;
+ Long _messageId;
+
+ RemoveRecord(Long messageId, ArrayList<AMQQueue> queues, MessageMetaData mmd, ArrayList<ContentChunk> chunks)
+ {
+ _messageId = messageId;
+ _queues = queues;
+ _mmd = mmd;
+ _chunks = chunks;
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
new file mode 100644
index 0000000000..b0c47052b2
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.routing.RoutingTable;
+
+import java.util.List;
+import java.util.LinkedList;
+
+public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
+{
+
+ List<Long> _singleEnqueues = new LinkedList<Long>();
+
+ public TestableTransactionLog()
+ {
+ super(null);
+ }
+
+ public TestableTransactionLog(BaseTransactionLog delegate)
+ {
+ super(delegate.getDelegate());
+ }
+
+ public TestableTransactionLog(TransactionLog delegate)
+ {
+ super(delegate);
+ }
+
+
+ @Override
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ if (_delegate != null)
+ {
+ TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+
+ // Unwrap any BaseTransactionLog
+ if (configuredLog instanceof BaseTransactionLog)
+ {
+ _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+ }
+ }
+ else
+ {
+ String delegateClass = config.getStoreConfiguration().getString("delegate");
+ Class clazz = Class.forName(delegateClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof TransactionLog))
+ {
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
+ " does not.");
+ }
+ _delegate = (TransactionLog) o;
+
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _delegate.configure(virtualHost, base, config);
+ }
+ return this;
+ }
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageID)
+ {
+ return _idToQueues.get(messageID);
+ }
+}