From 5aedab1b101894dd21a45eacc1fb9b4c8becc7de Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 4 Jul 2007 11:11:04 +0000 Subject: Messages moved by management console now commited on the message store. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@553172 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/queue/AMQQueue.java | 161 +++++++++++++-------- .../apache/qpid/server/queue/AMQQueueMBean.java | 66 ++++----- 2 files changed, 125 insertions(+), 102 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 1e1eaa2813..08111a423c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -1,25 +1,16 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ +/* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */ package org.apache.qpid.server.queue; +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -31,19 +22,10 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.JMException; - -import java.text.MessageFormat; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. @@ -156,26 +138,26 @@ public class AMQQueue implements Managable, Comparable public int compareTo(Object o) { - return _name.compareTo(((AMQQueue) o).getName()); + return _name.compareTo(((AMQQueue)o).getName()); } public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) - throws AMQException + throws AMQException { this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), - new SubscriptionSet(), new SubscriptionImpl.Factory()); + new SubscriptionSet(), new SubscriptionImpl.Factory()); } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, - VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException + VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException { this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, - new SubscriptionImpl.Factory()); + new SubscriptionImpl.Factory()); } protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, - VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers, - SubscriptionFactory subscriptionFactory) throws AMQException + VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers, + SubscriptionFactory subscriptionFactory) throws AMQException { if (name == null) { @@ -252,7 +234,7 @@ public class AMQQueue implements Managable, Comparable } /** - * Returns messages within the given range of message Ids + * Returns messages within the given range of message Ids. * * @param fromMessageId * @param toMessageId @@ -292,32 +274,86 @@ public class AMQQueue implements Managable, Comparable * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove * locks from both queues and start async delivery * - * @param fromMessageId - * @param toMessageId - * @param queueName - * @param storeContext + * @param fromMessageId The first message id to move. + * @param toMessageId The last message id to move. + * @param queueName The queue to move the messages to. + * @param storeContext The context of the message store under which to perform the move. This is associated with + * the stores transactional context. */ public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, - StoreContext storeContext) + StoreContext storeContext) { - // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock - AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); + + MessageStore fromStore = getVirtualHost().getMessageStore(); + MessageStore toStore = toQueue.getVirtualHost().getMessageStore(); + try { + // Obtain locks to prevent activity on the queues being moved between. startMovingMessages(); - List foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); + toQueue.startMovingMessages(); - // move messages to another queue - anotherQueue.startMovingMessages(); - anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList); + // Get the list of messages to move. + List foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); - // moving is successful, now remove from original queue - _deliveryMgr.removeMovedMessages(foundMessagesList); + try + { + fromStore.beginTran(storeContext); + + if (toStore != fromStore) + { + toStore.beginTran(storeContext); + } + + // Move the messages in on the message store. + for (AMQMessage message : foundMessagesList) + { + fromStore.dequeueMessage(storeContext, _name, message.getMessageId()); + toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId()); + } + + // Commit and flush the move transcations. + try + { + fromStore.commitTran(storeContext); + + if (toStore != fromStore) + { + toStore.commitTran(storeContext); + } + } + catch (AMQException e) + { + throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e); + } + + // Move the messages on the in-memory queues. + toQueue.enqueueMovedMessages(storeContext, foundMessagesList); + _deliveryMgr.removeMovedMessages(foundMessagesList); + } + // Abort the move transactions on move failures. + catch (AMQException e) + { + try + { + fromStore.abortTran(storeContext); + + if (toStore != fromStore) + { + toStore.abortTran(storeContext); + } + } + catch (AMQException ae) + { + throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae); + } + } } + // Release locks to allow activity on the queues being moved between to continue. finally { - // remove the lock and start the async delivery - anotherQueue.stopMovingMessages(); + toQueue.stopMovingMessages(); stopMovingMessages(); } } @@ -432,7 +468,7 @@ public class AMQQueue implements Managable, Comparable } public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { if (incrementSubscriberCount() > 1) { @@ -455,9 +491,8 @@ public class AMQQueue implements Managable, Comparable if (_logger.isDebugEnabled()) { - _logger.debug(MessageFormat.format( - "Registering protocol session {0} with channel {1} and " + "consumer tag {2} with {3}", ps, channel, - consumerTag, this)); + _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " + + "consumer tag {2} with {3}", ps, channel, consumerTag, this)); } Subscription subscription = @@ -499,17 +534,17 @@ public class AMQQueue implements Managable, Comparable if (_logger.isDebugEnabled()) { _logger.debug(MessageFormat.format( - "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, - consumerTag, this)); + "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", + ps, channel, consumerTag, this)); } Subscription removedSubscription; - if ((removedSubscription = - _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) + if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, + consumerTag))) == null) { throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag - + " and protocol session key " + ps.getKey() + " not registered with queue " + this); + + " and protocol session key " + ps.getKey() + " not registered with queue " + this); } removedSubscription.close(); @@ -688,7 +723,7 @@ public class AMQQueue implements Managable, Comparable return false; } - final AMQQueue amqQueue = (AMQQueue) o; + final AMQQueue amqQueue = (AMQQueue)o; return (_name.equals(amqQueue._name)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index bbaa7379f6..07872d7644 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -18,30 +18,23 @@ * under the License. * */ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ package org.apache.qpid.server.queue; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; +import org.apache.log4j.Logger; + +import org.apache.mina.common.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.store.StoreContext; import javax.management.JMException; import javax.management.MBeanException; @@ -60,30 +53,25 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import org.apache.log4j.Logger; - -import org.apache.mina.common.ByteBuffer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.store.StoreContext; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; /** - * MBean class for AMQQueue. It implements all the management features exposed - * for an AMQQueue. + * AMQQueueMBean is the management bean for an {@link AMQQueue}. + * + *

CRC Caption + * Responsibilities Collaborations + * */ @MBeanDescription("Management Interface for AMQQueue") public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { + /** Used for debugging purposes. */ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); /** -- cgit v1.2.1