summaryrefslogtreecommitdiff
path: root/java/broker/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/test')
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java13
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java77
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java30
-rw-r--r--java/broker/src/test/java/org/apache/qpid/util/MockChannel.java43
5 files changed, 164 insertions, 8 deletions
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 51012bc776..da35ddc594 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -72,7 +72,14 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
{
synchronized (_channelDelivers)
{
- List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+ List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+
+ if (all == null)
+ {
+ return new ArrayList<DeliveryPair>(0);
+ }
+
+ List<DeliveryPair> msgs = all.subList(0, count);
List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index fc15729893..db2f8a57ad 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -29,8 +29,6 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
-import org.apache.qpid.server.subscription.Subscription.State;
-import org.apache.qpid.server.subscription.Subscription.StateListener;
public class MockSubscription implements Subscription
{
@@ -40,15 +38,15 @@ public class MockSubscription implements Subscription
private AMQQueue queue = null;
private StateListener listener = null;
private QueueEntry lastSeen = null;
- private State state = State.ACTIVE;
+ private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
@Override
public void close()
{
closed = true;
- listener.stateChange(this, state , State.CLOSED);
- state = State.CLOSED;
+ listener.stateChange(this, _state, State.CLOSED);
+ _state = State.CLOSED;
}
@Override
@@ -179,6 +177,11 @@ public class MockSubscription implements Subscription
this.listener = listener;
}
+ public State getState()
+ {
+ return _state;
+ }
+
@Override
public boolean wouldSuspend(QueueEntry msg)
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
new file mode 100644
index 0000000000..d0db4ebd38
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+
+public class QueueBrowserUsesNoAckTest extends InternalBrokerBaseCase
+{
+
+ public void testQueueBrowserUsesNoAck() throws AMQException
+ {
+ int sendMessageCount = 2;
+ int prefetch = 1;
+
+ //Check store is empty
+ checkStoreContents(0);
+
+ //Send required messsages to the queue
+ publishMessages(_session, _channel, sendMessageCount);
+
+ //Ensure they are stored
+ checkStoreContents(sendMessageCount);
+
+ //Check that there are no unacked messages
+ assertEquals("Channel should have no unacked msgs ", 0,
+ _channel.getUnacknowledgedMessageMap().size());
+
+ //Set the prefetch on the session to be less than the sent messages
+ _channel.setCredit(0, prefetch);
+
+ //browse the queue
+ AMQShortString browser = browse(_channel, _queue);
+
+ _queue.deliverAsync();
+
+ //Wait for messages to fill the prefetch
+ _session.awaitDelivery(prefetch);
+
+ //Get those messages
+ List<InternalTestProtocolSession.DeliveryPair> messages =
+ _session.getDelivers(_channel.getChannelId(), browser,
+ prefetch);
+
+ //Ensure we recevied the prefetched messages
+ assertEquals(prefetch, messages.size());
+
+ //Check the process didn't suspend the subscription as this would
+ // indicate we are using the prefetch credit. i.e. using acks not No-Ack
+ assertTrue("The subscription has been suspended",
+ !_channel.getSubscription(browser).getState()
+ .equals(Subscription.State.SUSPENDED));
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 28eab73995..509ea817fd 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -36,15 +36,18 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.MockChannel;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
public class InternalBrokerBaseCase extends TestCase
{
protected IApplicationRegistry _registry;
protected MessageStore _messageStore;
- protected AMQChannel _channel;
+ protected MockChannel _channel;
protected InternalTestProtocolSession _session;
protected VirtualHost _virtualHost;
protected StoreContext _storeContext = new StoreContext();
@@ -74,7 +77,7 @@ public class InternalBrokerBaseCase extends TestCase
_session.setVirtualHost(_virtualHost);
- _channel = new AMQChannel(_session, 1, _messageStore);
+ _channel = new MockChannel(_session, 1, _messageStore);
_session.addChannel(_channel);
}
@@ -113,6 +116,29 @@ public class InternalBrokerBaseCase extends TestCase
return null;
}
+ protected AMQShortString browse(AMQChannel channel, AMQQueue queue)
+ {
+ try
+ {
+ FieldTable filters = new FieldTable();
+ filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
+
+ return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ //Keep the compiler happy
+ return null;
+ }
+
public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
{
MessagePublishInfo info = new MessagePublishInfo()
diff --git a/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
new file mode 100644
index 0000000000..447d09429d
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public class MockChannel extends AMQChannel
+{
+ public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ throws AMQException
+ {
+ super(session, channelId, messageStore);
+ }
+
+ public Subscription getSubscription(AMQShortString subscription)
+ {
+ return _tag2SubscriptionMap.get(subscription);
+ }
+
+}