summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-07-10 14:40:04 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-07-10 14:40:04 +0000
commit1f7170e3aa3651072b7af53b6929b7477457796b (patch)
tree3b63a00cb18ff3b9f3657c218a5da0dc6ff06d50 /java
parent2898a1c9b720a7947d2338a34d6b7fb3a0c0a6a9 (diff)
downloadqpid-python-1f7170e3aa3651072b7af53b6929b7477457796b.tar.gz
Added message copy method.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@554964 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java102
1 files changed, 83 insertions, 19 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 08111a423c..0b7b5e93d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -268,11 +268,8 @@ public class AMQQueue implements Managable, Comparable
}
/**
- * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
- * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
- * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
- * locks from both queues and start async delivery
+ * Moves messages from this queue to another queue, and also commits the move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
*
* @param fromMessageId The first message id to move.
* @param toMessageId The last message id to move.
@@ -288,6 +285,11 @@ public class AMQQueue implements Managable, Comparable
MessageStore fromStore = getVirtualHost().getMessageStore();
MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues on the same message store.");
+ }
+
try
{
// Obtain locks to prevent activity on the queues being moved between.
@@ -301,11 +303,6 @@ public class AMQQueue implements Managable, Comparable
{
fromStore.beginTran(storeContext);
- if (toStore != fromStore)
- {
- toStore.beginTran(storeContext);
- }
-
// Move the messages in on the message store.
for (AMQMessage message : foundMessagesList)
{
@@ -317,11 +314,6 @@ public class AMQQueue implements Managable, Comparable
try
{
fromStore.commitTran(storeContext);
-
- if (toStore != fromStore)
- {
- toStore.commitTran(storeContext);
- }
}
catch (AMQException e)
{
@@ -338,11 +330,83 @@ public class AMQQueue implements Managable, Comparable
try
{
fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
+ finally
+ {
+ toQueue.stopMovingMessages();
+ stopMovingMessages();
+ }
+ }
+
+ /**
+ * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
+ * on the queues being moved between is suspended during the move.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+ StoreContext storeContext)
+ {
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
+ if (toStore != fromStore)
+ {
+ throw new RuntimeException("Can only move messages between queues on the same message store.");
+ }
- if (toStore != fromStore)
- {
- toStore.abortTran(storeContext);
- }
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved between.
+ startMovingMessages();
+ toQueue.startMovingMessages();
+
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+ message.takeReference();
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
}
catch (AMQException ae)
{