diff options
Diffstat (limited to 'qpid/java')
7 files changed, 261 insertions, 88 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index b5ae8ea284..bb50df139f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -44,6 +44,7 @@ public class StoreContext private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap; private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap; private boolean _async; + private boolean _inTransaction; public StoreContext() { @@ -64,6 +65,9 @@ public class StoreContext { _name = name; _async = asynchrouous; + _inTransaction = false; + _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>(); + _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>(); } public StoreContext(boolean asynchronous) @@ -82,7 +86,7 @@ public class StoreContext { _logger.debug("public void setPayload(Object payload = " + payload + "): called"); } - _payload = payload; + _payload = payload; } /** @@ -137,7 +141,7 @@ public class StoreContext } /** - * Record the dequeue for processing on commit + * Record the dequeue for processing after the commit * * @param queue * @param messageId @@ -146,39 +150,37 @@ public class StoreContext */ public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException { - if (inTransaction()) - { - ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId); + ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId); - if (dequeues == null) - { - dequeues = new ArrayList<AMQQueue>(); - _dequeueMap.put(messageId, dequeues); - } - - dequeues.add(queue); + if (dequeues == null) + { + dequeues = new ArrayList<AMQQueue>(); + _dequeueMap.put(messageId, dequeues); } + + dequeues.add(queue); } public void beginTransaction() throws AMQException { - _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>(); - _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>(); + _inTransaction = true; } public void commitTransaction() throws AMQException { _dequeueMap.clear(); + _inTransaction = false; } public void abortTransaction() throws AMQException { _enqueueMap.clear(); + _inTransaction = false; } public boolean inTransaction() { - return _payload != null; + return _inTransaction; // _payload != null; } public boolean isAsync() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java index 4c3f1fcc49..973ecd6c09 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -39,7 +39,7 @@ public class BaseTransactionLog implements TransactionLog private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class); TransactionLog _delegate; - private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>(); + protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>(); public BaseTransactionLog(TransactionLog delegate) { @@ -60,7 +60,7 @@ public class BaseTransactionLog implements TransactionLog { context.enqueueMessage(queues, messageId); - if (queues.size() > 0) + if (queues.size() > 1) { _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues); @@ -73,10 +73,10 @@ public class BaseTransactionLog implements TransactionLog public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { + context.dequeueMessage(queue, messageId); + if (context.inTransaction()) { - context.dequeueMessage(queue, messageId); - Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap(); //For each Message ID that is in the map check @@ -97,11 +97,7 @@ public class BaseTransactionLog implements TransactionLog if (!context.inTransaction()) { - HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>(); - ArrayList list = new ArrayList(); - list.add(queue); - dequeue.put(messageId, list); - processDequeues(dequeue); + processDequeues(context.getDequeueMap()); } } @@ -128,11 +124,7 @@ public class BaseTransactionLog implements TransactionLog //Perform real commit of current data _delegate.commitTran(context); - // If we have dequeues to process then process them - if (context.getDequeueMap() != null) - { - processDequeues(context.getDequeueMap()); - } + processDequeues(context.getDequeueMap()); //Commit the recorded state for this transaction. context.commitTransaction(); @@ -141,10 +133,8 @@ public class BaseTransactionLog implements TransactionLog public void abortTran(StoreContext context) throws AMQException { // If we have enqueues to rollback - if (context.getEnqueueMap() != null) - { - processDequeues(context.getEnqueueMap()); - } + processDequeues(context.getEnqueueMap()); + //Abort the recorded state for this transaction. context.abortTransaction(); @@ -154,6 +144,12 @@ public class BaseTransactionLog implements TransactionLog private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap) throws AMQException { + // Check we have dequeues to process then process them + if (messageMap == null || messageMap.isEmpty()) + { + return; + } + // Process any enqueues to bring our model up to date. Set<Long> messageIDs = messageMap.keySet(); @@ -190,6 +186,8 @@ public class BaseTransactionLog implements TransactionLog if (enqueuedList.isEmpty()) { _delegate.removeMessage(removeContext, messageID); + //Remove references list + _idToQueues.remove(messageID); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 7bcfb9f59a..dc12d97557 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -295,14 +295,23 @@ public class VirtualHost implements Accessable } _transactionLog = (TransactionLog) o; + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config); + //Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable. if (_transactionLog instanceof RoutingTable) { _routingTable = (RoutingTable) _transactionLog; } + else if (_transactionLog instanceof BaseTransactionLog) + { + TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate(); + if (delegate instanceof RoutingTable) + { + _routingTable = (RoutingTable) delegate; + } + } - // If a TransactionLog uses the BaseTransactionLog then it will return this object. - _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config); } //todo we need to move from store.class to transactionlog.class diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index d4b1de29b2..d5e873ebc0 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -390,6 +390,7 @@ public class SimpleAMQQueueTest extends TestCase { sendMessage(txnContext); + // This check may be too soon as a purging thread may be required to bring the queue back under quota. long usage = _queue.getMemoryUsageCurrent(); assertTrue("Queue has gone over quota:" + usage, usage <= _queue.getMemoryUsageMaximum()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java index bb051693c3..38d139e94c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java @@ -21,11 +21,12 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.Map; import java.util.List; -public interface TestTransactionLog +public interface TestTransactionLog extends TransactionLog { public List<AMQQueue> getMessageReferenceMap(Long messageID); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java index d3294d4c10..0a2a1c2327 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class BaseTransactionLogTest extends TestCase implements TransactionLog @@ -46,14 +47,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>(); final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>(); - BaseTransactionLog _transactionLog; + TestableTransactionLog _transactionLog; private ArrayList<AMQQueue> _queues; private MockPersistentAMQMessage _message; public void setUp() throws Exception { super.setUp(); - _transactionLog = new BaseTransactionLog(this); + _transactionLog = new TestableTransactionLog(this); } public void testSingleEnqueueNoTransactional() throws AMQException @@ -87,11 +88,9 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testSingleEnqueueNoTransactional(); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } public void testSingleEnqueueTransactional() throws AMQException @@ -137,16 +136,13 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog _transactionLog.beginTran(context); - _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId()); _transactionLog.commitTran(context); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } - public void testMultipleEnqueueNoTransactional() throws AMQException { //Store Data @@ -185,34 +181,54 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog // Enqueue a message to dequeue testMultipleEnqueueNoTransactional(); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId()); - assertNotNull("Message not enqueued", enqueued); - assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0))); - assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); - assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); - assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + _queues.remove(0); + + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + verifyMessageStored(_message.getMessageId()); + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId()); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + _queues.remove(0); - enqueued = _enqueues.get(_message.getMessageId()); - assertNotNull("Message not enqueued", enqueued); - assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1))); - assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); - - assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); - assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + ArrayList<AMQQueue> enqueues = _enqueues.get(_message.getMessageId()); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId()); + assertNotNull("Message not enqueued", enqueues); + assertEquals("Message is not enqueued on the right number of queues", _queues.size(), enqueues.size()); + for (AMQQueue queue : _queues) + { + assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue)); + } + + //Use the reference map to ensure that we are enqueuing the right number of messages + List<AMQQueue> references = _transactionLog.getMessageReferenceMap(_message.getMessageId()); + + assertNotNull("Message not enqueued", references); + assertEquals("Message is not enqueued on the right number of queues", _queues.size(), references.size()); + for (AMQQueue queue : references) + { + assertTrue("Message not enqueued on:" + queue, references.contains(queue)); + } - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageStored(_message.getMessageId()); + + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); + + verifyMessageRemoved(_message.getMessageId()); } + private void verifyMessageRemoved(Long messageID) + { + assertNull("Message references exist", _transactionLog.getMessageReferenceMap(messageID)); + assertNull("Message enqueued", _enqueues.get(messageID)); + assertNull("Message chunks enqueued", _storeChunks.get(messageID)); + assertNull("Message meta data enqueued", _storeMetaData.get(messageID)); + } public void testMultipleEnqueueTransactional() throws AMQException { @@ -294,12 +310,10 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog _transactionLog.commitTran(context); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } - public void testMultipleDequeueSingleTransaction() throws AMQException + public void testMultipleDequeueSingleTransaction() throws AMQException { // Enqueue a message to dequeue testMultipleEnqueueTransactional(); @@ -318,10 +332,8 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); - _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId()); - enqueued = _enqueues.get(_message.getMessageId()); assertNotNull("Message not enqueued", enqueued); assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1))); @@ -330,14 +342,11 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); - _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId()); _transactionLog.commitTran(context); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } private void verifyMessageStored(Long messageId) @@ -356,6 +365,23 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog { assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue)); } + + //Use the reference map to ensure that we are enqueuing the right number of messages + List<AMQQueue> references = _transactionLog.getMessageReferenceMap(messageId); + + if (queues.size() == 1) + { + assertNull("Message has an enqueued list", references); + } + else + { + assertNotNull("Message not enqueued", references); + assertEquals("Message is not enqueued on the right number of queues", queues.size(), references.size()); + for (AMQQueue queue : references) + { + assertTrue("Message not enqueued on:" + queue, references.contains(queue)); + } + } } /*************************** TransactionLog ******************************* @@ -419,19 +445,42 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog if (queues == null) { - throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + - "queue(" + queue + ") but no enqueue data available"); - } + boolean found = false; + // If we are in a transaction we may have already done the dequeue. + if (context.inTransaction()) + { - synchronized (queues) - { - if (!queues.contains(queue)) + for (Object record : (ArrayList) context.getPayload()) + { + if (record instanceof RemoveRecord) + { + if (((RemoveRecord) record)._messageId.equals(messageId)) + { + found = true; + break; + } + } + } + } + + if (!found) { throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + - "queue(" + queue + ") but no message not enqueued on queue"); + "queue(" + queue + ") but no enqueue data available"); + } + } + else + { + synchronized (queues) + { + if (!queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + + "queue(" + queue + ") but no message not enqueued on queue"); + } + + queues.remove(queue); } - - queues.remove(queue); } } @@ -450,21 +499,29 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog "no enqueue data available"); } - if (!queues.isEmpty()) + if (queues.size() > 1) { throw new RuntimeException("Removed a message(" + messageId + ") that still had references."); } + MessageMetaData mmd; synchronized (_storeMetaData) { - _storeMetaData.remove(messageId); + mmd = _storeMetaData.remove(messageId); } + ArrayList<ContentChunk> chunks; synchronized (_storeChunks) { - _storeChunks.remove(messageId); + chunks = _storeChunks.remove(messageId); } + //Record the remove for part of the transaction + if (context.inTransaction()) + { + ArrayList transactionData = (ArrayList) context.getPayload(); + transactionData.add(new RemoveRecord(messageId, queues, mmd, chunks)); + } } // @@ -474,7 +531,7 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog public void beginTran(StoreContext context) throws AMQException { - context.setPayload(new Object()); + context.setPayload(new ArrayList()); } public void commitTran(StoreContext context) throws AMQException @@ -532,4 +589,20 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog { return false; } + + class RemoveRecord + { + MessageMetaData _mmd; + ArrayList<AMQQueue> _queues; + ArrayList<ContentChunk> _chunks; + Long _messageId; + + RemoveRecord(Long messageId, ArrayList<AMQQueue> queues, MessageMetaData mmd, ArrayList<ContentChunk> chunks) + { + _messageId = messageId; + _queues = queues; + _mmd = mmd; + _chunks = chunks; + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java new file mode 100644 index 0000000000..b0c47052b2 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.routing.RoutingTable; + +import java.util.List; +import java.util.LinkedList; + +public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog +{ + + List<Long> _singleEnqueues = new LinkedList<Long>(); + + public TestableTransactionLog() + { + super(null); + } + + public TestableTransactionLog(BaseTransactionLog delegate) + { + super(delegate.getDelegate()); + } + + public TestableTransactionLog(TransactionLog delegate) + { + super(delegate); + } + + + @Override + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + if (_delegate != null) + { + TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config); + + // Unwrap any BaseTransactionLog + if (configuredLog instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog)configuredLog).getDelegate(); + } + } + else + { + String delegateClass = config.getStoreConfiguration().getString("delegate"); + Class clazz = Class.forName(delegateClass); + Object o = clazz.newInstance(); + + if (!(o instanceof TransactionLog)) + { + throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + + " does not."); + } + _delegate = (TransactionLog) o; + + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _delegate.configure(virtualHost, base, config); + } + return this; + } + + public List<AMQQueue> getMessageReferenceMap(Long messageID) + { + return _idToQueues.get(messageID); + } +} |
