summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java43
2 files changed, 65 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 6805d8261e..6a19acddd7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -823,6 +823,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
+
+ if (toQueue.equals(this))
+ {
+ //nothing to do here, message is already at the requested destination
+ return;
+ }
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -848,19 +854,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
- if (entry.isPersistent() && toQueue.isDurable())
+ if (entry.isPersistent())
{
//FIXME
//fixme
- ArrayList list = new ArrayList();
+
+ // Creating a list with the destination queue AND the current queue.
+ // This is a hack to ensure a reference is kept in the TLog to the new destination when dequeing
+ // the old destination below, thus preventing incorrect removal of the message from the store
+ ArrayList<AMQQueue> list = new ArrayList<AMQQueue>();
list.add(toQueue);
+ list.add(this);
transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
}
- // Commit and flush the move transcations.
+ // Commit and flush the move transactions.
try
{
transactionLog.commitTran(storeContext);
@@ -891,7 +902,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
toQueue.enqueue(storeContext, entry.getMessage());
// As we only did a dequeue above now that we have moved the message we should perform a delete.
// We cannot do this earlier as the message will be lost if flowed.
- //entry.delete();
+ entry.delete();
}
}
catch (MessageCleanupException e)
@@ -913,6 +924,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
+ if (toQueue.equals(this))
+ {
+ //nothing to do here, message is already at the requested destination
+ return;
+ }
+
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -944,11 +961,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
- if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && entry.isPersistent())
{
//fixme
//FIXME
+
+ // Creating a list with the destination queue AND the current queue.
+ // This is a hack to ensure a reference is kept in the TLog to the old destination when enqueing
ArrayList list = new ArrayList();
+ list.add(this);
list.add(toQueue);
transactionLog.enqueueMessage(storeContext, list, entry.getMessageId());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
index e200de7a6f..8d7b22d470 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
@@ -67,12 +67,47 @@ public class BaseTransactionLog implements TransactionLog
{
_logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
}
+
+ //list to hold which new queues to enqueue the message on
+ ArrayList<AMQQueue> toEnqueueList = new ArrayList<AMQQueue>();
+
+ List<AMQQueue> enqueuedList = _idToQueues.get(messageId);
+ if (enqueuedList != null)
+ {
+ //There are previous enqueues for this messageId
+ synchronized (enqueuedList)
+ {
+ for(AMQQueue queue : queues)
+ {
+ if(!enqueuedList.contains(queue))
+ {
+ //update the old list.
+ enqueuedList.add(queue);
+ //keep track of new enqueues to be made
+ toEnqueueList.add(queue);
+ }
+ }
+ }
+
+ if(toEnqueueList.isEmpty())
+ {
+ //no new queues to enqueue message on
+ return;
+ }
+ }
+ else
+ {
+ //No existing list, add all provided queues (cloning toEnqueueList in case someone else changes original).
+ toEnqueueList.addAll(queues);
+ _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)toEnqueueList.clone()));
+ }
- //Clone the list incase someone else changes it.
- _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
+ _delegate.enqueueMessage(context, toEnqueueList, messageId);
+ }
+ else
+ {
+ _delegate.enqueueMessage(context, queues, messageId);
}
-
- _delegate.enqueueMessage(context, queues, messageId);
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException