diff options
| author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-13 16:04:00 +0000 |
|---|---|---|
| committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-13 16:04:00 +0000 |
| commit | d7ffa02047b5bfeea5c0289976d5355b69a5a11c (patch) | |
| tree | aebed39d0c6fa6b63b18658e4323e55b3099bb78 /java/broker | |
| parent | d697dcb5a267ab1cb363d3a6e675a5f6f505b838 (diff) | |
| download | qpid-python-d7ffa02047b5bfeea5c0289976d5355b69a5a11c.tar.gz | |
QPID-411 : ClearQueue functionality of AMQQueue doesn't reset the queue depth
AMQQueueMBeanTest.java moved to Broker tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517745 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
3 files changed, 245 insertions, 0 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 6122d191f8..f9b5b5174c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -378,6 +378,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager msg = getNextMessage(); count++; } + _totalMessageSize.set(0L); } _lock.unlock(); return count; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index b0f520a8c3..1eb3506720 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -220,7 +220,12 @@ public class AMQQueueAlertTest extends TestCase assertTrue(_queueMBean.getQueueDepth() == totalSize); protocolSession.closeSession(); + + // Check the clear queue + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getQueueDepth() == 0); } + protected AMQMessage message(final boolean immediate, long size) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java new file mode 100644 index 0000000000..182c6a2d01 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -0,0 +1,239 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MemoryMessageStore; + +import javax.management.JMException; +import java.util.LinkedList; +import java.util.HashSet; + +/** + * Test class to test AMQQueueMBean attribtues and operations + */ +public class AMQQueueMBeanTest extends TestCase +{ + private static long MESSAGE_SIZE = 1000; + private AMQQueue _queue; + private AMQQueueMBean _queueMBean; + private MessageStore _messageStore = new MemoryMessageStore(); + private StoreContext _storeContext = new StoreContext(); + private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + private VirtualHost _virtualHost; + + public void testMessageCount() throws Exception + { + int messageCount = 10; + sendMessages(messageCount); + assertTrue(_queueMBean.getMessageCount() == messageCount); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); + + _queueMBean.deleteMessageFromTop(); + assertTrue(_queueMBean.getMessageCount() == (messageCount - 1)); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getMessageCount() == 0); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + } + + public void testConsumerCount() throws AMQException + { + SubscriptionManager mgr = _queue.getSubscribers(); + assertFalse(mgr.hasActiveSubscribers()); + assertTrue(_queueMBean.getActiveConsumerCount() == 0); + + + TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + protocolSession.addChannel(channel); + + _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); + assertTrue(_queueMBean.getActiveConsumerCount() == 1); + + SubscriptionSet _subscribers = (SubscriptionSet) mgr; + SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); + Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S1"), + false, + null, + true, + _queue); + + Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S2"), + false, + null, + true, + _queue); + _subscribers.addSubscriber(s1); + _subscribers.addSubscriber(s2); + assertTrue(_queueMBean.getActiveConsumerCount() == 3); + assertTrue(_queueMBean.getConsumerCount() == 3); + + s1.close(); + assertTrue(_queueMBean.getActiveConsumerCount() == 2); + assertTrue(_queueMBean.getConsumerCount() == 3); + } + + public void testGeneralProperties() + { + long maxQueueDepth = 1000; // in bytes + _queueMBean.setMaximumMessageCount(50000); + _queueMBean.setMaximumMessageSize(2000l); + _queueMBean.setMaximumQueueDepth(maxQueueDepth); + + assertTrue(_queueMBean.getMaximumMessageCount() == 50000); + assertTrue(_queueMBean.getMaximumMessageSize() == 2000); + assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10)); + + assertTrue(_queueMBean.getName().equals("testQueue")); + assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest")); + assertFalse(_queueMBean.isAutoDelete()); + assertFalse(_queueMBean.isDurable()); + } + + public void testExceptions() throws Exception + { + try + { + _queueMBean.viewMessages(0, 3); + fail(); + } + catch (JMException ex) + { + + } + + try + { + _queueMBean.viewMessages(2, 1); + fail(); + } + catch (JMException ex) + { + + } + + try + { + _queueMBean.viewMessages(-1, 1); + fail(); + } + catch (JMException ex) + { + + } + + AMQMessage msg = message(false); + long id = msg.getMessageId(); + _queue.clearQueue(_storeContext); + + msg.enqueue(_queue); + msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + _queue.process(_storeContext, msg, false); + _queueMBean.viewMessageContent(id); + try + { + _queueMBean.viewMessageContent(id + 1); + fail(); + } + catch (JMException ex) + { + + } + } + + private AMQMessage message(final boolean immediate) throws AMQException + { + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); + _queueMBean = new AMQQueueMBean(_queue); + } + + private void sendMessages(int messageCount) throws AMQException + { + AMQMessage[] messages = new AMQMessage[messageCount]; + for (int i = 0; i < messages.length; i++) + { + messages[i] = message(false); + messages[i].enqueue(_queue); + messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + } + for (int i = 0; i < messageCount; i++) + { + _queue.process(_storeContext, messages[i], false); + } + } +} |
