diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java | 68 |
1 files changed, 23 insertions, 45 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 69a0bfbcf4..342782ac10 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import junit.framework.TestCase; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -31,32 +30,22 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.util.InternalBrokerBaseCase; import javax.management.Notification; import java.util.ArrayList; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ -public class AMQQueueAlertTest extends TestCase +public class AMQQueueAlertTest extends InternalBrokerBaseCase { private final static long MAX_MESSAGE_COUNT = 50; private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB - private AMQQueue _queue; private AMQQueueMBean _queueMBean; - private VirtualHost _virtualHost; - private AMQProtocolEngine _protocolSession; - private MessageStore _messageStore = new MemoryMessageStore(); private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE; /** @@ -66,9 +55,9 @@ public class AMQQueueAlertTest extends TestCase */ public void testMessageCountAlert() throws Exception { - _protocolSession = new InternalTestProtocolSession(_virtualHost); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); - _protocolSession.addChannel(channel); + _session = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_session, 2, _messageStore); + _session.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"), false, false, @@ -94,9 +83,9 @@ public class AMQQueueAlertTest extends TestCase */ public void testMessageSizeAlert() throws Exception { - _protocolSession = new InternalTestProtocolSession(_virtualHost); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); - _protocolSession.addChannel(channel); + _session = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_session, 2, _messageStore); + _session.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"), false, false, @@ -124,9 +113,9 @@ public class AMQQueueAlertTest extends TestCase */ public void testQueueDepthAlertNoSubscriber() throws Exception { - _protocolSession = new InternalTestProtocolSession(_virtualHost); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); - _protocolSession.addChannel(channel); + _session = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_session, 2, _messageStore); + _session.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"), false, false, @@ -157,9 +146,9 @@ public class AMQQueueAlertTest extends TestCase */ public void testMessageAgeAlert() throws Exception { - _protocolSession = new InternalTestProtocolSession(_virtualHost); - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); - _protocolSession.addChannel(channel); + _session = new InternalTestProtocolSession(_virtualHost); + AMQChannel channel = new AMQChannel(_session, 2, _messageStore); + _session.addChannel(channel); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"), false, false, @@ -174,7 +163,7 @@ public class AMQQueueAlertTest extends TestCase Thread.sleep(MAX_MESSAGE_AGE * 2); Notification lastNotification = _queueMBean.getLastNotification(); - assertNotNull(lastNotification); + assertNotNull("Last notification was null", lastNotification); String notificationMsg = lastNotification.getMessage(); assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name())); @@ -190,13 +179,13 @@ public class AMQQueueAlertTest extends TestCase */ public void testQueueDepthAlertWithSubscribers() throws Exception { - AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore); - _protocolSession.addChannel(channel); + AMQChannel channel = new AMQChannel(_session, 2, _messageStore); + _session.addChannel(channel); // Create queue _queue = getNewQueue(); Subscription subscription = - SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), _protocolSession, new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager()); + SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), _session, new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager()); _queue.registerSubscription( subscription, false); @@ -231,7 +220,7 @@ public class AMQQueueAlertTest extends TestCase // Connect a consumer again and check QueueDepth values. The queue should get emptied. // Messages will get delivered but still are unacknowledged. Subscription subscription2 = - SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), _protocolSession, new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager()); + SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), _session, new AMQShortString("consumer_tag"), true, null, false, channel.getCreditManager()); _queue.registerSubscription( subscription2, false); @@ -248,7 +237,7 @@ public class AMQQueueAlertTest extends TestCase channel.requeue(); assertEquals(new Long(totalSize), new Long(_queueMBean.getQueueDepth())); - _protocolSession.closeSession(); + _session.closeSession(); // Check the clear queue _queueMBean.clearQueue(); @@ -297,23 +286,12 @@ public class AMQQueueAlertTest extends TestCase } @Override - protected void setUp() throws Exception + protected void configure() { - super.setUp(); - IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); - _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _protocolSession = new InternalTestProtocolSession(_virtualHost); - CurrentActor.set(_protocolSession.getLogActor()); + // Increase Alert Check period + _configuration.setHousekeepingExpiredMessageCheckPeriod(500); } - protected void tearDown() - { - // Remove the Protocol Session Actor set above - CurrentActor.remove(); - ApplicationRegistry.remove(); - } - - private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException { IncomingMessage[] messages = new IncomingMessage[(int) messageCount]; |
