diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java | 19 |
1 files changed, 17 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java index 973ecd6c09..dded7f7142 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.Iterator; public class BaseTransactionLog implements TransactionLog { @@ -80,15 +81,18 @@ public class BaseTransactionLog implements TransactionLog Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap(); //For each Message ID that is in the map check - for (Long messageID : messageMap.keySet()) + Iterator iterator = messageMap.keySet().iterator(); + + while (iterator.hasNext()) { + Long messageID = (Long) iterator.next(); //If we don't have a gloabl reference for this message then there is only a single enqueue if (_idToQueues.get(messageID) == null) { // Add the removal of the message to this transaction _delegate.removeMessage(context,messageID); // Remove this message ID as we have processed it so we don't reprocess after the main commmit - messageMap.remove(messageID); + iterator.remove(); } } } @@ -179,6 +183,15 @@ public class BaseTransactionLog implements TransactionLog } else { + //When a message is on more than one queue it is possible that this code section is exectuted + // by one thread per enqueue. + // It is however, thread safe because there is only removes being performed and so the + // last thread that does the remove will see the empty queue and remove the message + // At this stage there is nothing that is going to cause this operation to abort. So we don't + // need to worry about any potential adds. + // The message will no longer be enqueued as that operation has been committed before now so + // this is clean up of the data. + // Update the enqueued list enqueuedList.remove(queue); @@ -195,6 +208,8 @@ public class BaseTransactionLog implements TransactionLog //Commit the removes on the delegate. _delegate.commitTran(removeContext); + // Mark this context as committed. + removeContext.commitTransaction(); } finally { |
