summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-07-04 11:11:04 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-07-04 11:11:04 +0000
commit5aedab1b101894dd21a45eacc1fb9b4c8becc7de (patch)
tree285de2d1c79e2dfc6f7c7d6e369c99001295e882 /java
parente4f9a8d7e300a4267b1b61a8c6839f08df648e6b (diff)
downloadqpid-python-5aedab1b101894dd21a45eacc1fb9b4c8becc7de.tar.gz
Messages moved by management console now commited on the message store.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@553172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java161
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java66
2 files changed, 125 insertions, 102 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 1e1eaa2813..08111a423c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -1,25 +1,16 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
+/* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */
package org.apache.qpid.server.queue;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -31,19 +22,10 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.JMException;
-
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
@@ -156,26 +138,26 @@ public class AMQQueue implements Managable, Comparable
public int compareTo(Object o)
{
- return _name.compareTo(((AMQQueue) o).getName());
+ return _name.compareTo(((AMQQueue)o).getName());
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
+ throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
- new SubscriptionSet(), new SubscriptionImpl.Factory());
+ new SubscriptionSet(), new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
+ VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
- new SubscriptionImpl.Factory());
+ new SubscriptionImpl.Factory());
}
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
- SubscriptionFactory subscriptionFactory) throws AMQException
+ VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
+ SubscriptionFactory subscriptionFactory) throws AMQException
{
if (name == null)
{
@@ -252,7 +234,7 @@ public class AMQQueue implements Managable, Comparable
}
/**
- * Returns messages within the given range of message Ids
+ * Returns messages within the given range of message Ids.
*
* @param fromMessageId
* @param toMessageId
@@ -292,32 +274,86 @@ public class AMQQueue implements Managable, Comparable
* (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
* locks from both queues and start async delivery
*
- * @param fromMessageId
- * @param toMessageId
- * @param queueName
- * @param storeContext
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param queueName The queue to move the messages to.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
+ StoreContext storeContext)
{
- // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
- AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+ MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+
try
{
+ // Obtain locks to prevent activity on the queues being moved between.
startMovingMessages();
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ toQueue.startMovingMessages();
- // move messages to another queue
- anotherQueue.startMovingMessages();
- anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
- // moving is successful, now remove from original queue
- _deliveryMgr.removeMovedMessages(foundMessagesList);
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ if (toStore != fromStore)
+ {
+ toStore.beginTran(storeContext);
+ }
+
+ // Move the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+ toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+
+ if (toStore != fromStore)
+ {
+ toStore.commitTran(storeContext);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // Move the messages on the in-memory queues.
+ toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+
+ if (toStore != fromStore)
+ {
+ toStore.abortTran(storeContext);
+ }
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
}
+ // Release locks to allow activity on the queues being moved between to continue.
finally
{
- // remove the lock and start the async delivery
- anotherQueue.stopMovingMessages();
+ toQueue.stopMovingMessages();
stopMovingMessages();
}
}
@@ -432,7 +468,7 @@ public class AMQQueue implements Managable, Comparable
}
public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
{
if (incrementSubscriberCount() > 1)
{
@@ -455,9 +491,8 @@ public class AMQQueue implements Managable, Comparable
if (_logger.isDebugEnabled())
{
- _logger.debug(MessageFormat.format(
- "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel,
- consumerTag, this));
+ _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and "
+ + "consumer tag {2} with {3}", ps, channel, consumerTag, this));
}
Subscription subscription =
@@ -499,17 +534,17 @@ public class AMQQueue implements Managable, Comparable
if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format(
- "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel,
- consumerTag, this));
+ "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
+ ps, channel, consumerTag, this));
}
Subscription removedSubscription;
- if ((removedSubscription =
- _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag)))
+ if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
+ consumerTag)))
== null)
{
throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
+ + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
removedSubscription.close();
@@ -688,7 +723,7 @@ public class AMQQueue implements Managable, Comparable
return false;
}
- final AMQQueue amqQueue = (AMQQueue) o;
+ final AMQQueue amqQueue = (AMQQueue)o;
return (_name.equals(amqQueue._name));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index bbaa7379f6..07872d7644 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -18,30 +18,23 @@
* under the License.
*
*/
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
package org.apache.qpid.server.queue;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -60,30 +53,25 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
/**
- * MBean class for AMQQueue. It implements all the management features exposed
- * for an AMQQueue.
+ * AMQQueueMBean is the management bean for an {@link AMQQueue}.
+ *
+ * <p/><tablse id="crc"><caption>CRC Caption</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
*/
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+ /** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
/**