diff options
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 31 | ||||
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java | 43 |
2 files changed, 65 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 6805d8261e..6a19acddd7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -823,6 +823,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); TransactionLog transactionLog = getVirtualHost().getTransactionLog(); + + if (toQueue.equals(this)) + { + //nothing to do here, message is already at the requested destination + return; + } List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -848,19 +854,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in the transaction log. for (QueueEntry entry : entries) { - if (entry.isPersistent() && toQueue.isDurable()) + if (entry.isPersistent()) { //FIXME //fixme - ArrayList list = new ArrayList(); + + // Creating a list with the destination queue AND the current queue. + // This is a hack to ensure a reference is kept in the TLog to the new destination when dequeing + // the old destination below, thus preventing incorrect removal of the message from the store + ArrayList<AMQQueue> list = new ArrayList<AMQQueue>(); list.add(toQueue); + list.add(this); transactionLog.enqueueMessage(storeContext, list, entry.getMessageId()); } // dequeue will remove the messages from the queue entry.dequeue(storeContext); } - // Commit and flush the move transcations. + // Commit and flush the move transactions. try { transactionLog.commitTran(storeContext); @@ -891,7 +902,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener toQueue.enqueue(storeContext, entry.getMessage()); // As we only did a dequeue above now that we have moved the message we should perform a delete. // We cannot do this earlier as the message will be lost if flowed. - //entry.delete(); + entry.delete(); } } catch (MessageCleanupException e) @@ -913,6 +924,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); TransactionLog transactionLog = getVirtualHost().getTransactionLog(); + if (toQueue.equals(this)) + { + //nothing to do here, message is already at the requested destination + return; + } + List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -944,11 +961,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in on the transaction log. for (QueueEntry entry : entries) { - if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable()) + if (!entry.isDeleted() && entry.isPersistent()) { //fixme //FIXME + + // Creating a list with the destination queue AND the current queue. + // This is a hack to ensure a reference is kept in the TLog to the old destination when enqueing ArrayList list = new ArrayList(); + list.add(this); list.add(toQueue); transactionLog.enqueueMessage(storeContext, list, entry.getMessageId()); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java index e200de7a6f..8d7b22d470 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java @@ -67,12 +67,47 @@ public class BaseTransactionLog implements TransactionLog { _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues); } + + //list to hold which new queues to enqueue the message on + ArrayList<AMQQueue> toEnqueueList = new ArrayList<AMQQueue>(); + + List<AMQQueue> enqueuedList = _idToQueues.get(messageId); + if (enqueuedList != null) + { + //There are previous enqueues for this messageId + synchronized (enqueuedList) + { + for(AMQQueue queue : queues) + { + if(!enqueuedList.contains(queue)) + { + //update the old list. + enqueuedList.add(queue); + //keep track of new enqueues to be made + toEnqueueList.add(queue); + } + } + } + + if(toEnqueueList.isEmpty()) + { + //no new queues to enqueue message on + return; + } + } + else + { + //No existing list, add all provided queues (cloning toEnqueueList in case someone else changes original). + toEnqueueList.addAll(queues); + _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)toEnqueueList.clone())); + } - //Clone the list incase someone else changes it. - _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone())); + _delegate.enqueueMessage(context, toEnqueueList, messageId); + } + else + { + _delegate.enqueueMessage(context, queues, messageId); } - - _delegate.enqueueMessage(context, queues, messageId); } public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException |
