diff options
Diffstat (limited to 'qpid/java')
9 files changed, 187 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3cd343e1b2..a668016f93 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -93,8 +93,7 @@ public class AMQChannel private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); - + protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); private final MessageStore _messageStore; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 408defe453..9419572399 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -82,6 +82,8 @@ public interface Subscription void setStateListener(final StateListener listener); + public State getState(); + QueueEntry getLastSeenEntry(); boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 556b87590c..df84270275 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -79,6 +79,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + @Override public boolean isBrowser() { return true; @@ -91,6 +92,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param msg The message to send * @throws AMQException */ + @Override public void send(QueueEntry msg) throws AMQException { // We don't decrement the reference here as we don't want to consume the message @@ -103,6 +105,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + + @Override + public boolean wouldSuspend(QueueEntry msg) + { + return false; + } + } public static class NoAckSubscription extends SubscriptionImpl @@ -118,6 +127,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + @Override public boolean isBrowser() { return false; @@ -130,6 +140,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { @@ -166,6 +177,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + @Override public boolean wouldSuspend(QueueEntry msg) { return false; @@ -185,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } + @Override public boolean isBrowser() { return false; @@ -198,6 +211,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 51012bc776..da35ddc594 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -72,7 +72,14 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen { synchronized (_channelDelivers) { - List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count); + List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag); + + if (all == null) + { + return new ArrayList<DeliveryPair>(0); + } + + List<DeliveryPair> msgs = all.subList(0, count); List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index fc15729893..db2f8a57ad 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -29,8 +29,6 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState; -import org.apache.qpid.server.subscription.Subscription.State; -import org.apache.qpid.server.subscription.Subscription.StateListener; public class MockSubscription implements Subscription { @@ -40,15 +38,15 @@ public class MockSubscription implements Subscription private AMQQueue queue = null; private StateListener listener = null; private QueueEntry lastSeen = null; - private State state = State.ACTIVE; + private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); @Override public void close() { closed = true; - listener.stateChange(this, state , State.CLOSED); - state = State.CLOSED; + listener.stateChange(this, _state, State.CLOSED); + _state = State.CLOSED; } @Override @@ -179,6 +177,11 @@ public class MockSubscription implements Subscription this.listener = listener; } + public State getState() + { + return _state; + } + @Override public boolean wouldSuspend(QueueEntry msg) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java new file mode 100644 index 0000000000..d0db4ebd38 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.AMQException; + +import java.util.List; + +public class QueueBrowserUsesNoAckTest extends InternalBrokerBaseCase +{ + + public void testQueueBrowserUsesNoAck() throws AMQException + { + int sendMessageCount = 2; + int prefetch = 1; + + //Check store is empty + checkStoreContents(0); + + //Send required messsages to the queue + publishMessages(_session, _channel, sendMessageCount); + + //Ensure they are stored + checkStoreContents(sendMessageCount); + + //Check that there are no unacked messages + assertEquals("Channel should have no unacked msgs ", 0, + _channel.getUnacknowledgedMessageMap().size()); + + //Set the prefetch on the session to be less than the sent messages + _channel.setCredit(0, prefetch); + + //browse the queue + AMQShortString browser = browse(_channel, _queue); + + _queue.deliverAsync(); + + //Wait for messages to fill the prefetch + _session.awaitDelivery(prefetch); + + //Get those messages + List<InternalTestProtocolSession.DeliveryPair> messages = + _session.getDelivers(_channel.getChannelId(), browser, + prefetch); + + //Ensure we recevied the prefetched messages + assertEquals(prefetch, messages.size()); + + //Check the process didn't suspend the subscription as this would + // indicate we are using the prefetch credit. i.e. using acks not No-Ack + assertTrue("The subscription has been suspended", + !_channel.getSubscription(browser).getState() + .equals(Subscription.State.SUSPENDED)); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 28eab73995..509ea817fd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -36,15 +36,18 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.AMQException; +import org.apache.qpid.util.MockChannel; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; public class InternalBrokerBaseCase extends TestCase { protected IApplicationRegistry _registry; protected MessageStore _messageStore; - protected AMQChannel _channel; + protected MockChannel _channel; protected InternalTestProtocolSession _session; protected VirtualHost _virtualHost; protected StoreContext _storeContext = new StoreContext(); @@ -74,7 +77,7 @@ public class InternalBrokerBaseCase extends TestCase _session.setVirtualHost(_virtualHost); - _channel = new AMQChannel(_session, 1, _messageStore); + _channel = new MockChannel(_session, 1, _messageStore); _session.addChannel(_channel); } @@ -113,6 +116,29 @@ public class InternalBrokerBaseCase extends TestCase return null; } + protected AMQShortString browse(AMQChannel channel, AMQQueue queue) + { + try + { + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + + return channel.subscribeToQueue(null, queue, true, filters, false, true); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + catch (ConsumerTagNotUniqueException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + //Keep the compiler happy + return null; + } + public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException { MessagePublishInfo info = new MessagePublishInfo() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java new file mode 100644 index 0000000000..447d09429d --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +public class MockChannel extends AMQChannel +{ + public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) + throws AMQException + { + super(session, channelId, messageStore); + } + + public Subscription getSubscription(AMQShortString subscription) + { + return _tag2SubscriptionMap.get(subscription); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index eed60a1a7c..c20c0bc579 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -110,6 +110,12 @@ public class SubscriptionTestHelper implements Subscription { //To change body of implemented methods use File | Settings | File Templates. } + + @Override + public State getState() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } public QueueEntry getLastSeenEntry() { |
