diff options
| author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-07 11:39:21 +0000 |
|---|---|---|
| committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-07 11:39:21 +0000 |
| commit | df552a1a4862615bad7335409fc6df1a80368911 (patch) | |
| tree | 1951cc23b814f557a5086bdf19260fc0dbc2d196 /java | |
| parent | 4a016bad34628ed7aa0f2409b80098290cf023a6 (diff) | |
| download | qpid-python-df552a1a4862615bad7335409fc6df1a80368911.tar.gz | |
1. Fixed the AMQQueueMBeanTest failures due to changes in AMQQueuMBean.getQueueDepth() from queueDepth/1000 to (queueDepth >> 10)
2.
Revision: 513748
Author: bhupendrab
Date: 13:26:51, 02 March 2007
Message:
QPID-390
Added test case for all the AMQQueue alerts
----
Modified : /incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Added : /incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@515539 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 236 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 254348dba0..056fb5fc01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -81,8 +81,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"}; private static OpenType[] _msgContentAttributeTypes = new OpenType[4]; - private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + private Notification _lastNotification = null; @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException @@ -256,11 +256,17 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { // important : add log to the log file - monitoring tools may be looking for this _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg); - - Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, + notificationMsg = notification.name() + " " + notificationMsg; + + _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); - _broadcaster.sendNotification(n); + _broadcaster.sendNotification(_lastNotification); + } + + public Notification getLastNotification() + { + return _lastNotification; } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 5cf08c857e..87868f0b25 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -354,14 +354,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeAMessageFromTop(StoreContext storeContext) throws AMQException { _lock.lock(); - AMQMessage msg = getNextMessage(); - if (msg != null) - { - // mark this message as taken and get it removed - msg.taken(null); - _queue.dequeue(storeContext, msg); - getNextMessage(); - } + + AMQMessage message = _messages.poll(); + _totalMessageSize.addAndGet(-message.getSize()); _lock.unlock(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java new file mode 100644 index 0000000000..ebfd18ddca --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -0,0 +1,212 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; + +import javax.management.Notification; +import java.util.LinkedList; +import java.util.HashSet; + +/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ +public class AMQQueueAlertTest extends TestCase +{ + private final static int MAX_MESSAGE_COUNT = 50; + private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec + private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB + private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB + private AMQQueue _queue; + private AMQQueueMBean _queueMBean; + private VirtualHost _virtualHost; + private MessageStore _messageStore = new MemoryMessageStore(); + private StoreContext _storeContext = new StoreContext(); + private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + + /** + * Tests if the alert gets thrown when message count increases the threshold limit + * + * @throws Exception + */ + public void testMessageCountAlert() throws Exception + { + _queue = new AMQQueue(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"), + false, _virtualHost); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); + + _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); + + sendMessages(MAX_MESSAGE_COUNT, 256l); + assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT); + + Notification lastNotification = _queueMBean.getLastNotification(); + assertNotNull(lastNotification); + + String notificationMsg = lastNotification.getMessage(); + assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name())); + } + + /** + * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent + * + * @throws Exception + */ + public void testMessageSizeAlert() throws Exception + { + _queue = new AMQQueue(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"), + false, _virtualHost); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); + _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); + _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE); + + sendMessages(1, MAX_MESSAGE_SIZE * 2); + assertTrue(_queueMBean.getMessageCount() == 1); + + Notification lastNotification = _queueMBean.getLastNotification(); + assertNotNull(lastNotification); + + String notificationMsg = lastNotification.getMessage(); + assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name())); + } + + /** + * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value + * + * @throws Exception + */ + public void testQueueDepthAlert() throws Exception + { + _queue = new AMQQueue(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"), + false, _virtualHost); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); + _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); + _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); + + while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH) + { + sendMessages(1, MAX_MESSAGE_SIZE); + } + + Notification lastNotification = _queueMBean.getLastNotification(); + assertNotNull(lastNotification); + + String notificationMsg = lastNotification.getMessage(); + assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name())); + } + + /** + * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of + * message age + * + * @throws Exception + */ + public void testMessageAgeAlert() throws Exception + { + _queue = new AMQQueue(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"), + false, _virtualHost); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); + _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); + _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE); + + sendMessages(1, MAX_MESSAGE_SIZE); + + // Ensure message sits on queue long enough to age. + Thread.sleep(MAX_MESSAGE_AGE * 2); + + sendMessages(1, MAX_MESSAGE_SIZE); + assertTrue(_queueMBean.getMessageCount() == 2); + + Notification lastNotification = _queueMBean.getLastNotification(); + assertNotNull(lastNotification); + + String notificationMsg = lastNotification.getMessage(); + assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name())); + } + + protected AMQMessage message(final boolean immediate, long size) throws AMQException + { + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); + contentHeaderBody.bodySize = size; // in bytes + AMQMessage message = new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext); + message.setContentHeaderBody(contentHeaderBody); + return message; + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); + _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + } + + private void sendMessages(int messageCount, long size) throws AMQException + { + AMQMessage[] messages = new AMQMessage[messageCount]; + for (int i = 0; i < messages.length; i++) + { + messages[i] = message(false, size); + messages[i].enqueue(_queue); + messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + } + + for (int i = 0; i < messageCount; i++) + { + _queue.process(_storeContext, messages[i], false); + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 26332579cb..015138ee6f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -42,6 +42,7 @@ import java.util.HashSet; */ public class AMQQueueMBeanTest extends TestCase { + private static long MESSAGE_SIZE = 1000; private AMQQueue _queue; private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; @@ -61,7 +62,8 @@ public class AMQQueueMBeanTest extends TestCase sendMessages(messageCount); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); - assertTrue(_queueMBean.getQueueDepth() == 10); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); _queueMBean.deleteMessageFromTop(); assertTrue(_queueMBean.getMessageCount() == messageCount - 1); @@ -101,13 +103,14 @@ public class AMQQueueMBeanTest extends TestCase public void testGeneralProperties() { + long maxQueueDepth = 1000; // in bytes _queueMBean.setMaximumMessageCount(50000); _queueMBean.setMaximumMessageSize(2000l); - _queueMBean.setMaximumQueueDepth(1000l); + _queueMBean.setMaximumQueueDepth(maxQueueDepth); assertTrue(_queueMBean.getMaximumMessageCount() == 50000); assertTrue(_queueMBean.getMaximumMessageSize() == 2000); - assertTrue(_queueMBean.getMaximumQueueDepth() == 1000); + assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10)); assertTrue(_queueMBean.getName().equals("testQueue")); assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest")); @@ -150,8 +153,10 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage msg = message(false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - _queue.process(_storeContext, msg, false); + + msg.enqueue(_queue); msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + _queue.process(_storeContext, msg, false); _queueMBean.viewMessageContent(id); try { @@ -212,15 +217,12 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false); + messages[i].enqueue(_queue); + messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); } for (int i = 0; i < messageCount; i++) { _queue.process(_storeContext, messages[i], false); } - - for (int i = 0; i < messages.length; i++) - { - messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - } } } |
