diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-07-10 14:40:04 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-07-10 14:40:04 +0000 |
| commit | 1f7170e3aa3651072b7af53b6929b7477457796b (patch) | |
| tree | 3b63a00cb18ff3b9f3657c218a5da0dc6ff06d50 /java | |
| parent | 2898a1c9b720a7947d2338a34d6b7fb3a0c0a6a9 (diff) | |
| download | qpid-python-1f7170e3aa3651072b7af53b6929b7477457796b.tar.gz | |
Added message copy method.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@554964 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java | 102 |
1 files changed, 83 insertions, 19 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 08111a423c..0b7b5e93d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -268,11 +268,8 @@ public class AMQQueue implements Managable, Comparable } /** - * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup - * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue - * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove - * locks from both queues and start async delivery + * Moves messages from this queue to another queue, and also commits the move on the message store. Delivery activity + * on the queues being moved between is suspended during the move. * * @param fromMessageId The first message id to move. * @param toMessageId The last message id to move. @@ -288,6 +285,11 @@ public class AMQQueue implements Managable, Comparable MessageStore fromStore = getVirtualHost().getMessageStore(); MessageStore toStore = toQueue.getVirtualHost().getMessageStore(); + if (toStore != fromStore) + { + throw new RuntimeException("Can only move messages between queues on the same message store."); + } + try { // Obtain locks to prevent activity on the queues being moved between. @@ -301,11 +303,6 @@ public class AMQQueue implements Managable, Comparable { fromStore.beginTran(storeContext); - if (toStore != fromStore) - { - toStore.beginTran(storeContext); - } - // Move the messages in on the message store. for (AMQMessage message : foundMessagesList) { @@ -317,11 +314,6 @@ public class AMQQueue implements Managable, Comparable try { fromStore.commitTran(storeContext); - - if (toStore != fromStore) - { - toStore.commitTran(storeContext); - } } catch (AMQException e) { @@ -338,11 +330,83 @@ public class AMQQueue implements Managable, Comparable try { fromStore.abortTran(storeContext); + } + catch (AMQException ae) + { + throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae); + } + } + } + // Release locks to allow activity on the queues being moved between to continue. + finally + { + toQueue.stopMovingMessages(); + stopMovingMessages(); + } + } + + /** + * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity + * on the queues being moved between is suspended during the move. + * + * @param fromMessageId The first message id to move. + * @param toMessageId The last message id to move. + * @param queueName The queue to move the messages to. + * @param storeContext The context of the message store under which to perform the move. This is associated with + * the stores transactional context. + */ + public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, + StoreContext storeContext) + { + AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + + MessageStore fromStore = getVirtualHost().getMessageStore(); + MessageStore toStore = toQueue.getVirtualHost().getMessageStore(); + + if (toStore != fromStore) + { + throw new RuntimeException("Can only move messages between queues on the same message store."); + } - if (toStore != fromStore) - { - toStore.abortTran(storeContext); - } + try + { + // Obtain locks to prevent activity on the queues being moved between. + startMovingMessages(); + toQueue.startMovingMessages(); + + // Get the list of messages to move. + List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); + + try + { + fromStore.beginTran(storeContext); + + // Move the messages in on the message store. + for (AMQMessage message : foundMessagesList) + { + toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId()); + message.takeReference(); + } + + // Commit and flush the move transcations. + try + { + fromStore.commitTran(storeContext); + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + + // Move the messages on the in-memory queues. + toQueue.enqueueMovedMessages(storeContext, foundMessagesList); + } + // Abort the move transactions on move failures. + catch (AMQException e) + { + try + { + fromStore.abortTran(storeContext); } catch (AMQException ae) { |
