From 4d692b9645b2d2c0842dd4d85e23d1d26e8dc34e Mon Sep 17 00:00:00 2001 From: Bhupendra Bhusman Bhardwaj Date: Tue, 13 Mar 2007 16:04:00 +0000 Subject: QPID-411 : ClearQueue functionality of AMQQueue doesn't reset the queue depth AMQQueueMBeanTest.java moved to Broker tests git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@517745 13f79535-47bb-0310-9956-ffa450edef68 --- .../queue/ConcurrentSelectorDeliveryManager.java | 1 + .../qpid/server/queue/AMQQueueAlertTest.java | 5 + .../qpid/server/queue/AMQQueueMBeanTest.java | 239 +++++++++++++++++++++ .../qpid/server/queue/AMQQueueMBeanTest.java | 228 -------------------- 4 files changed, 245 insertions(+), 228 deletions(-) create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 6122d191f8..f9b5b5174c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -378,6 +378,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager msg = getNextMessage(); count++; } + _totalMessageSize.set(0L); } _lock.unlock(); return count; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index b0f520a8c3..1eb3506720 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -220,7 +220,12 @@ public class AMQQueueAlertTest extends TestCase assertTrue(_queueMBean.getQueueDepth() == totalSize); protocolSession.closeSession(); + + // Check the clear queue + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getQueueDepth() == 0); } + protected AMQMessage message(final boolean immediate, long size) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java new file mode 100644 index 0000000000..182c6a2d01 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -0,0 +1,239 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MemoryMessageStore; + +import javax.management.JMException; +import java.util.LinkedList; +import java.util.HashSet; + +/** + * Test class to test AMQQueueMBean attribtues and operations + */ +public class AMQQueueMBeanTest extends TestCase +{ + private static long MESSAGE_SIZE = 1000; + private AMQQueue _queue; + private AMQQueueMBean _queueMBean; + private MessageStore _messageStore = new MemoryMessageStore(); + private StoreContext _storeContext = new StoreContext(); + private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList(), + new HashSet()); + private VirtualHost _virtualHost; + + public void testMessageCount() throws Exception + { + int messageCount = 10; + sendMessages(messageCount); + assertTrue(_queueMBean.getMessageCount() == messageCount); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); + + _queueMBean.deleteMessageFromTop(); + assertTrue(_queueMBean.getMessageCount() == (messageCount - 1)); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getMessageCount() == 0); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + } + + public void testConsumerCount() throws AMQException + { + SubscriptionManager mgr = _queue.getSubscribers(); + assertFalse(mgr.hasActiveSubscribers()); + assertTrue(_queueMBean.getActiveConsumerCount() == 0); + + + TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + protocolSession.addChannel(channel); + + _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); + assertTrue(_queueMBean.getActiveConsumerCount() == 1); + + SubscriptionSet _subscribers = (SubscriptionSet) mgr; + SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); + Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S1"), + false, + null, + true, + _queue); + + Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S2"), + false, + null, + true, + _queue); + _subscribers.addSubscriber(s1); + _subscribers.addSubscriber(s2); + assertTrue(_queueMBean.getActiveConsumerCount() == 3); + assertTrue(_queueMBean.getConsumerCount() == 3); + + s1.close(); + assertTrue(_queueMBean.getActiveConsumerCount() == 2); + assertTrue(_queueMBean.getConsumerCount() == 3); + } + + public void testGeneralProperties() + { + long maxQueueDepth = 1000; // in bytes + _queueMBean.setMaximumMessageCount(50000); + _queueMBean.setMaximumMessageSize(2000l); + _queueMBean.setMaximumQueueDepth(maxQueueDepth); + + assertTrue(_queueMBean.getMaximumMessageCount() == 50000); + assertTrue(_queueMBean.getMaximumMessageSize() == 2000); + assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10)); + + assertTrue(_queueMBean.getName().equals("testQueue")); + assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest")); + assertFalse(_queueMBean.isAutoDelete()); + assertFalse(_queueMBean.isDurable()); + } + + public void testExceptions() throws Exception + { + try + { + _queueMBean.viewMessages(0, 3); + fail(); + } + catch (JMException ex) + { + + } + + try + { + _queueMBean.viewMessages(2, 1); + fail(); + } + catch (JMException ex) + { + + } + + try + { + _queueMBean.viewMessages(-1, 1); + fail(); + } + catch (JMException ex) + { + + } + + AMQMessage msg = message(false); + long id = msg.getMessageId(); + _queue.clearQueue(_storeContext); + + msg.enqueue(_queue); + msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + _queue.process(_storeContext, msg, false); + _queueMBean.viewMessageContent(id); + try + { + _queueMBean.viewMessageContent(id + 1); + fail(); + } + catch (JMException ex) + { + + } + } + + private AMQMessage message(final boolean immediate) throws AMQException + { + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); + _queueMBean = new AMQQueueMBean(_queue); + } + + private void sendMessages(int messageCount) throws AMQException + { + AMQMessage[] messages = new AMQMessage[messageCount]; + for (int i = 0; i < messages.length; i++) + { + messages[i] = message(false); + messages[i].enqueue(_queue); + messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + } + for (int i = 0; i < messageCount; i++) + { + _queue.process(_storeContext, messages[i], false); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java deleted file mode 100644 index 015138ee6f..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.store.StoreContext; - -import javax.management.JMException; -import java.util.LinkedList; -import java.util.HashSet; - -/** - * Test class to test AMQQueueMBean attribtues and operations - */ -public class AMQQueueMBeanTest extends TestCase -{ - private static long MESSAGE_SIZE = 1000; - private AMQQueue _queue; - private AMQQueueMBean _queueMBean; - private QueueRegistry _queueRegistry; - private MessageStore _messageStore = new SkeletonMessageStore(); - private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList(), - new HashSet()); - private MockProtocolSession _protocolSession; - private AMQChannel _channel; - private VirtualHost _virtualHost; - - public void testMessageCount() throws Exception - { - int messageCount = 10; - sendMessages(messageCount); - assertTrue(_queueMBean.getMessageCount() == messageCount); - assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); - long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; - assertTrue(_queueMBean.getQueueDepth() == queueDepth); - - _queueMBean.deleteMessageFromTop(); - assertTrue(_queueMBean.getMessageCount() == messageCount - 1); - assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); - - _queueMBean.clearQueue(); - assertTrue(_queueMBean.getMessageCount() == 0); - assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); - } - - public void testConsumerCount() throws Exception - { - SubscriptionManager mgr = _queue.getSubscribers(); - assertFalse(mgr.hasActiveSubscribers()); - assertTrue(_queueMBean.getActiveConsumerCount() == 0); - - - _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession, 1, _messageStore, null); - _protocolSession.addChannel(_channel); - - _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false); - assertTrue(_queueMBean.getActiveConsumerCount() == 1); - - SubscriptionSet _subscribers = (SubscriptionSet) mgr; - SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); - SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); - _subscribers.addSubscriber(s1); - _subscribers.addSubscriber(s2); - assertTrue(_queueMBean.getActiveConsumerCount() == 3); - assertTrue(_queueMBean.getConsumerCount() == 3); - - s1.setSuspended(true); - assertTrue(_queueMBean.getActiveConsumerCount() == 2); - assertTrue(_queueMBean.getConsumerCount() == 3); - } - - public void testGeneralProperties() - { - long maxQueueDepth = 1000; // in bytes - _queueMBean.setMaximumMessageCount(50000); - _queueMBean.setMaximumMessageSize(2000l); - _queueMBean.setMaximumQueueDepth(maxQueueDepth); - - assertTrue(_queueMBean.getMaximumMessageCount() == 50000); - assertTrue(_queueMBean.getMaximumMessageSize() == 2000); - assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10)); - - assertTrue(_queueMBean.getName().equals("testQueue")); - assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest")); - assertFalse(_queueMBean.isAutoDelete()); - assertFalse(_queueMBean.isDurable()); - } - - public void testExceptions() throws Exception - { - try - { - _queueMBean.viewMessages(0, 3); - fail(); - } - catch (JMException ex) - { - - } - - try - { - _queueMBean.viewMessages(2, 1); - fail(); - } - catch (JMException ex) - { - - } - - try - { - _queueMBean.viewMessages(-1, 1); - fail(); - } - catch (JMException ex) - { - - } - - AMQMessage msg = message(false); - long id = msg.getMessageId(); - _queue.clearQueue(_storeContext); - - msg.enqueue(_queue); - msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - _queue.process(_storeContext, msg, false); - _queueMBean.viewMessageContent(id); - try - { - _queueMBean.viewMessageContent(id + 1); - fail(); - } - catch (JMException ex) - { - - } - } - - private AMQMessage message(final boolean immediate) throws AMQException - { - MessagePublishInfo publish = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public boolean isImmediate() - { - return immediate; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = 1000; // in bytes - return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); - } - - @Override - protected void setUp() throws Exception - { - super.setUp(); - IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); - _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _queueRegistry = _virtualHost.getQueueRegistry(); - _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); - _queueMBean = new AMQQueueMBean(_queue); - } - - private void sendMessages(int messageCount) throws AMQException - { - AMQMessage[] messages = new AMQMessage[messageCount]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(false); - messages[i].enqueue(_queue); - messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - } - for (int i = 0; i < messageCount; i++) - { - _queue.process(_storeContext, messages[i], false); - } - } -} -- cgit v1.2.1