summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java19
1 files changed, 17 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
index 973ecd6c09..dded7f7142 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Iterator;
public class BaseTransactionLog implements TransactionLog
{
@@ -80,15 +81,18 @@ public class BaseTransactionLog implements TransactionLog
Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
//For each Message ID that is in the map check
- for (Long messageID : messageMap.keySet())
+ Iterator iterator = messageMap.keySet().iterator();
+
+ while (iterator.hasNext())
{
+ Long messageID = (Long) iterator.next();
//If we don't have a gloabl reference for this message then there is only a single enqueue
if (_idToQueues.get(messageID) == null)
{
// Add the removal of the message to this transaction
_delegate.removeMessage(context,messageID);
// Remove this message ID as we have processed it so we don't reprocess after the main commmit
- messageMap.remove(messageID);
+ iterator.remove();
}
}
}
@@ -179,6 +183,15 @@ public class BaseTransactionLog implements TransactionLog
}
else
{
+ //When a message is on more than one queue it is possible that this code section is exectuted
+ // by one thread per enqueue.
+ // It is however, thread safe because there is only removes being performed and so the
+ // last thread that does the remove will see the empty queue and remove the message
+ // At this stage there is nothing that is going to cause this operation to abort. So we don't
+ // need to worry about any potential adds.
+ // The message will no longer be enqueued as that operation has been committed before now so
+ // this is clean up of the data.
+
// Update the enqueued list
enqueuedList.remove(queue);
@@ -195,6 +208,8 @@ public class BaseTransactionLog implements TransactionLog
//Commit the removes on the delegate.
_delegate.commitTran(removeContext);
+ // Mark this context as committed.
+ removeContext.commitTransaction();
}
finally
{