summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java77
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java30
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java43
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java6
9 files changed, 187 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3cd343e1b2..a668016f93 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -93,8 +93,7 @@ public class AMQChannel
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
-
+ protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
private final MessageStore _messageStore;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 408defe453..9419572399 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -82,6 +82,8 @@ public interface Subscription
void setStateListener(final StateListener listener);
+ public State getState();
+
QueueEntry getLastSeenEntry();
boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 556b87590c..df84270275 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -79,6 +79,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
+ @Override
public boolean isBrowser()
{
return true;
@@ -91,6 +92,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param msg The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry msg) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
@@ -103,6 +105,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+
+ @Override
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return false;
+ }
+
}
public static class NoAckSubscription extends SubscriptionImpl
@@ -118,6 +127,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
+ @Override
public boolean isBrowser()
{
return false;
@@ -130,6 +140,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{
@@ -166,6 +177,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+ @Override
public boolean wouldSuspend(QueueEntry msg)
{
return false;
@@ -185,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ @Override
public boolean isBrowser()
{
return false;
@@ -198,6 +211,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 51012bc776..da35ddc594 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -72,7 +72,14 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
{
synchronized (_channelDelivers)
{
- List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+ List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+
+ if (all == null)
+ {
+ return new ArrayList<DeliveryPair>(0);
+ }
+
+ List<DeliveryPair> msgs = all.subList(0, count);
List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index fc15729893..db2f8a57ad 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -29,8 +29,6 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
-import org.apache.qpid.server.subscription.Subscription.State;
-import org.apache.qpid.server.subscription.Subscription.StateListener;
public class MockSubscription implements Subscription
{
@@ -40,15 +38,15 @@ public class MockSubscription implements Subscription
private AMQQueue queue = null;
private StateListener listener = null;
private QueueEntry lastSeen = null;
- private State state = State.ACTIVE;
+ private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
@Override
public void close()
{
closed = true;
- listener.stateChange(this, state , State.CLOSED);
- state = State.CLOSED;
+ listener.stateChange(this, _state, State.CLOSED);
+ _state = State.CLOSED;
}
@Override
@@ -179,6 +177,11 @@ public class MockSubscription implements Subscription
this.listener = listener;
}
+ public State getState()
+ {
+ return _state;
+ }
+
@Override
public boolean wouldSuspend(QueueEntry msg)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
new file mode 100644
index 0000000000..d0db4ebd38
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+
+public class QueueBrowserUsesNoAckTest extends InternalBrokerBaseCase
+{
+
+ public void testQueueBrowserUsesNoAck() throws AMQException
+ {
+ int sendMessageCount = 2;
+ int prefetch = 1;
+
+ //Check store is empty
+ checkStoreContents(0);
+
+ //Send required messsages to the queue
+ publishMessages(_session, _channel, sendMessageCount);
+
+ //Ensure they are stored
+ checkStoreContents(sendMessageCount);
+
+ //Check that there are no unacked messages
+ assertEquals("Channel should have no unacked msgs ", 0,
+ _channel.getUnacknowledgedMessageMap().size());
+
+ //Set the prefetch on the session to be less than the sent messages
+ _channel.setCredit(0, prefetch);
+
+ //browse the queue
+ AMQShortString browser = browse(_channel, _queue);
+
+ _queue.deliverAsync();
+
+ //Wait for messages to fill the prefetch
+ _session.awaitDelivery(prefetch);
+
+ //Get those messages
+ List<InternalTestProtocolSession.DeliveryPair> messages =
+ _session.getDelivers(_channel.getChannelId(), browser,
+ prefetch);
+
+ //Ensure we recevied the prefetched messages
+ assertEquals(prefetch, messages.size());
+
+ //Check the process didn't suspend the subscription as this would
+ // indicate we are using the prefetch credit. i.e. using acks not No-Ack
+ assertTrue("The subscription has been suspended",
+ !_channel.getSubscription(browser).getState()
+ .equals(Subscription.State.SUSPENDED));
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 28eab73995..509ea817fd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -36,15 +36,18 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.MockChannel;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
public class InternalBrokerBaseCase extends TestCase
{
protected IApplicationRegistry _registry;
protected MessageStore _messageStore;
- protected AMQChannel _channel;
+ protected MockChannel _channel;
protected InternalTestProtocolSession _session;
protected VirtualHost _virtualHost;
protected StoreContext _storeContext = new StoreContext();
@@ -74,7 +77,7 @@ public class InternalBrokerBaseCase extends TestCase
_session.setVirtualHost(_virtualHost);
- _channel = new AMQChannel(_session, 1, _messageStore);
+ _channel = new MockChannel(_session, 1, _messageStore);
_session.addChannel(_channel);
}
@@ -113,6 +116,29 @@ public class InternalBrokerBaseCase extends TestCase
return null;
}
+ protected AMQShortString browse(AMQChannel channel, AMQQueue queue)
+ {
+ try
+ {
+ FieldTable filters = new FieldTable();
+ filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
+
+ return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ //Keep the compiler happy
+ return null;
+ }
+
public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
{
MessagePublishInfo info = new MessagePublishInfo()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
new file mode 100644
index 0000000000..447d09429d
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public class MockChannel extends AMQChannel
+{
+ public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+ throws AMQException
+ {
+ super(session, channelId, messageStore);
+ }
+
+ public Subscription getSubscription(AMQShortString subscription)
+ {
+ return _tag2SubscriptionMap.get(subscription);
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index eed60a1a7c..c20c0bc579 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -110,6 +110,12 @@ public class SubscriptionTestHelper implements Subscription
{
//To change body of implemented methods use File | Settings | File Templates.
}
+
+ @Override
+ public State getState()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
public QueueEntry getLastSeenEntry()
{