summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-07 11:39:21 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-07 11:39:21 +0000
commitdf552a1a4862615bad7335409fc6df1a80368911 (patch)
tree1951cc23b814f557a5086bdf19260fc0dbc2d196 /java
parent4a016bad34628ed7aa0f2409b80098290cf023a6 (diff)
downloadqpid-python-df552a1a4862615bad7335409fc6df1a80368911.tar.gz
1. Fixed the AMQQueueMBeanTest failures due to changes in AMQQueuMBean.getQueueDepth() from queueDepth/1000 to (queueDepth >> 10)
2. Revision: 513748 Author: bhupendrab Date: 13:26:51, 02 March 2007 Message: QPID-390 Added test case for all the AMQQueue alerts ---- Modified : /incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Added : /incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@515539 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java212
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java20
4 files changed, 236 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 254348dba0..056fb5fc01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -81,8 +81,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+ private Notification _lastNotification = null;
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
@@ -256,11 +256,17 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
-
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ notificationMsg = notification.name() + " " + notificationMsg;
+
+ _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
- _broadcaster.sendNotification(n);
+ _broadcaster.sendNotification(_lastNotification);
+ }
+
+ public Notification getLastNotification()
+ {
+ return _lastNotification;
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 5cf08c857e..87868f0b25 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -354,14 +354,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
_lock.lock();
- AMQMessage msg = getNextMessage();
- if (msg != null)
- {
- // mark this message as taken and get it removed
- msg.taken(null);
- _queue.dequeue(storeContext, msg);
- getNextMessage();
- }
+
+ AMQMessage message = _messages.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
_lock.unlock();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
new file mode 100644
index 0000000000..ebfd18ddca
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import javax.management.Notification;
+import java.util.LinkedList;
+import java.util.HashSet;
+
+/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
+public class AMQQueueAlertTest extends TestCase
+{
+ private final static int MAX_MESSAGE_COUNT = 50;
+ private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
+ private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
+ private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB
+ private AMQQueue _queue;
+ private AMQQueueMBean _queueMBean;
+ private VirtualHost _virtualHost;
+ private MessageStore _messageStore = new MemoryMessageStore();
+ private StoreContext _storeContext = new StoreContext();
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+
+ /**
+ * Tests if the alert gets thrown when message count increases the threshold limit
+ *
+ * @throws Exception
+ */
+ public void testMessageCountAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+
+ sendMessages(MAX_MESSAGE_COUNT, 256l);
+ assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
+ }
+
+ /**
+ * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
+ *
+ * @throws Exception
+ */
+ public void testMessageSizeAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
+
+ sendMessages(1, MAX_MESSAGE_SIZE * 2);
+ assertTrue(_queueMBean.getMessageCount() == 1);
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
+ }
+
+ /**
+ * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
+ *
+ * @throws Exception
+ */
+ public void testQueueDepthAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+ while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
+ {
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ }
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+ }
+
+ /**
+ * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
+ * message age
+ *
+ * @throws Exception
+ */
+ public void testMessageAgeAlert() throws Exception
+ {
+ _queue = new AMQQueue(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
+ false, _virtualHost);
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
+
+ sendMessages(1, MAX_MESSAGE_SIZE);
+
+ // Ensure message sits on queue long enough to age.
+ Thread.sleep(MAX_MESSAGE_AGE * 2);
+
+ sendMessages(1, MAX_MESSAGE_SIZE);
+ assertTrue(_queueMBean.getMessageCount() == 2);
+
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
+ }
+
+ protected AMQMessage message(final boolean immediate, long size) throws AMQException
+ {
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = size; // in bytes
+ AMQMessage message = new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext);
+ message.setContentHeaderBody(contentHeaderBody);
+ return message;
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+ _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ }
+
+ private void sendMessages(int messageCount, long size) throws AMQException
+ {
+ AMQMessage[] messages = new AMQMessage[messageCount];
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = message(false, size);
+ messages[i].enqueue(_queue);
+ messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ }
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ _queue.process(_storeContext, messages[i], false);
+ }
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 26332579cb..015138ee6f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
*/
public class AMQQueueMBeanTest extends TestCase
{
+ private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
@@ -61,7 +62,8 @@ public class AMQQueueMBeanTest extends TestCase
sendMessages(messageCount);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- assertTrue(_queueMBean.getQueueDepth() == 10);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
_queueMBean.deleteMessageFromTop();
assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
@@ -101,13 +103,14 @@ public class AMQQueueMBeanTest extends TestCase
public void testGeneralProperties()
{
+ long maxQueueDepth = 1000; // in bytes
_queueMBean.setMaximumMessageCount(50000);
_queueMBean.setMaximumMessageSize(2000l);
- _queueMBean.setMaximumQueueDepth(1000l);
+ _queueMBean.setMaximumQueueDepth(maxQueueDepth);
assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
- assertTrue(_queueMBean.getMaximumQueueDepth() == 1000);
+ assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
assertTrue(_queueMBean.getName().equals("testQueue"));
assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
@@ -150,8 +153,10 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage msg = message(false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- _queue.process(_storeContext, msg, false);
+
+ msg.enqueue(_queue);
msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+ _queue.process(_storeContext, msg, false);
_queueMBean.viewMessageContent(id);
try
{
@@ -212,15 +217,12 @@ public class AMQQueueMBeanTest extends TestCase
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false);
+ messages[i].enqueue(_queue);
+ messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
}
for (int i = 0; i < messageCount; i++)
{
_queue.process(_storeContext, messages[i], false);
}
-
- for (int i = 0; i < messages.length; i++)
- {
- messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- }
}
}