summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java1
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (renamed from java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java)43
3 files changed, 33 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 6122d191f8..f9b5b5174c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -378,6 +378,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
msg = getNextMessage();
count++;
}
+ _totalMessageSize.set(0L);
}
_lock.unlock();
return count;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index b0f520a8c3..1eb3506720 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -220,7 +220,12 @@ public class AMQQueueAlertTest extends TestCase
assertTrue(_queueMBean.getQueueDepth() == totalSize);
protocolSession.closeSession();
+
+ // Check the clear queue
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getQueueDepth() == 0);
}
+
protected AMQMessage message(final boolean immediate, long size) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 015138ee6f..182c6a2d01 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -24,14 +24,15 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MemoryMessageStore;
import javax.management.JMException;
import java.util.LinkedList;
@@ -45,15 +46,12 @@ public class AMQQueueMBeanTest extends TestCase
private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
- private QueueRegistry _queueRegistry;
- private MessageStore _messageStore = new SkeletonMessageStore();
+ private MessageStore _messageStore = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
- private MockProtocolSession _protocolSession;
- private AMQChannel _channel;
private VirtualHost _virtualHost;
public void testMessageCount() throws Exception
@@ -66,7 +64,7 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getQueueDepth() == queueDepth);
_queueMBean.deleteMessageFromTop();
- assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
_queueMBean.clearQueue();
@@ -74,29 +72,43 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
}
- public void testConsumerCount() throws Exception
+ public void testConsumerCount() throws AMQException
{
SubscriptionManager mgr = _queue.getSubscribers();
assertFalse(mgr.hasActiveSubscribers());
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- _protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
- _protocolSession.addChannel(_channel);
+ TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
+ protocolSession.addChannel(channel);
- _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
+ _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
- SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
- SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
+ SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
+ Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S1"),
+ false,
+ null,
+ true,
+ _queue);
+
+ Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S2"),
+ false,
+ null,
+ true,
+ _queue);
_subscribers.addSubscriber(s1);
_subscribers.addSubscriber(s2);
assertTrue(_queueMBean.getActiveConsumerCount() == 3);
assertTrue(_queueMBean.getConsumerCount() == 3);
- s1.setSuspended(true);
+ s1.close();
assertTrue(_queueMBean.getActiveConsumerCount() == 2);
assertTrue(_queueMBean.getConsumerCount() == 3);
}
@@ -196,7 +208,7 @@ public class AMQQueueMBeanTest extends TestCase
};
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1000; // in bytes
+ contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
}
@@ -206,7 +218,6 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _queueRegistry = _virtualHost.getQueueRegistry();
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
}