diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 1 | ||||
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java | 5 | ||||
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (renamed from java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java) | 43 |
3 files changed, 33 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 6122d191f8..f9b5b5174c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -378,6 +378,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager msg = getNextMessage(); count++; } + _totalMessageSize.set(0L); } _lock.unlock(); return count; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index b0f520a8c3..1eb3506720 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -220,7 +220,12 @@ public class AMQQueueAlertTest extends TestCase assertTrue(_queueMBean.getQueueDepth() == totalSize); protocolSession.closeSession(); + + // Check the clear queue + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getQueueDepth() == 0); } + protected AMQMessage message(final boolean immediate, long size) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 015138ee6f..182c6a2d01 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -24,14 +24,15 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.protocol.TestMinaProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MemoryMessageStore; import javax.management.JMException; import java.util.LinkedList; @@ -45,15 +46,12 @@ public class AMQQueueMBeanTest extends TestCase private static long MESSAGE_SIZE = 1000; private AMQQueue _queue; private AMQQueueMBean _queueMBean; - private QueueRegistry _queueRegistry; - private MessageStore _messageStore = new SkeletonMessageStore(); + private MessageStore _messageStore = new MemoryMessageStore(); private StoreContext _storeContext = new StoreContext(); private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); - private MockProtocolSession _protocolSession; - private AMQChannel _channel; private VirtualHost _virtualHost; public void testMessageCount() throws Exception @@ -66,7 +64,7 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getQueueDepth() == queueDepth); _queueMBean.deleteMessageFromTop(); - assertTrue(_queueMBean.getMessageCount() == messageCount - 1); + assertTrue(_queueMBean.getMessageCount() == (messageCount - 1)); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); _queueMBean.clearQueue(); @@ -74,29 +72,43 @@ public class AMQQueueMBeanTest extends TestCase assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); } - public void testConsumerCount() throws Exception + public void testConsumerCount() throws AMQException { SubscriptionManager mgr = _queue.getSubscribers(); assertFalse(mgr.hasActiveSubscribers()); assertTrue(_queueMBean.getActiveConsumerCount() == 0); - _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession, 1, _messageStore, null); - _protocolSession.addChannel(_channel); + TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + protocolSession.addChannel(channel); - _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false); + _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; - SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1"); - SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2"); + SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); + Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S1"), + false, + null, + true, + _queue); + + Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S2"), + false, + null, + true, + _queue); _subscribers.addSubscriber(s1); _subscribers.addSubscriber(s2); assertTrue(_queueMBean.getActiveConsumerCount() == 3); assertTrue(_queueMBean.getConsumerCount() == 3); - s1.setSuspended(true); + s1.close(); assertTrue(_queueMBean.getActiveConsumerCount() == 2); assertTrue(_queueMBean.getConsumerCount() == 3); } @@ -196,7 +208,7 @@ public class AMQQueueMBeanTest extends TestCase }; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = 1000; // in bytes + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); } @@ -206,7 +218,6 @@ public class AMQQueueMBeanTest extends TestCase super.setUp(); IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); - _queueRegistry = _virtualHost.getQueueRegistry(); _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); _queueMBean = new AMQQueueMBean(_queue); } |
