diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-08-08 22:55:35 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-08-08 22:55:35 +0000 |
| commit | b4dda260b39c112fe578a0b4dd16b0429d820ff9 (patch) | |
| tree | 1620099f67d6335dd6ef15356be3f30f1793fa04 /qpid/java | |
| parent | 09d5704b80e7b7936ec913c107c4537faf643d73 (diff) | |
| download | qpid-python-b4dda260b39c112fe578a0b4dd16b0429d820ff9.tar.gz | |
QPID-3386: move all server Subscription creation into the SubscriptionFactoryImpl, ensure all Subscription implementations share a common ID sequence generator
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1155137 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
12 files changed, 221 insertions, 114 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index fbc5387daf..2dff45c326 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -696,7 +697,7 @@ public class Bridge implements BridgeConfig //TODO Handle the passing of non-null Filters and Arguments here - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, + Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session, _destination, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, @@ -768,7 +769,7 @@ public class Bridge implements BridgeConfig //TODO Handle the passing of non-null Filters and Arguments here - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, + Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session, _destination, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 790027f293..14ce85530e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -162,14 +162,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { - sub = new GetNoAckSubscription(channel, - session, - null, - null, - false, - singleMessageCredit, - getDeliveryMethod, - getRecordMethod); + sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); } queue.registerSubscription(sub,false); @@ -180,27 +173,5 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } - public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription - { - public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - public boolean isTransient() - { - return true; - } - - public boolean wouldSuspend(QueueEntry msg) - { - return !getCreditManager().useCreditForMessage(msg.getMessage()); - } - - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java index ce0362d73f..0fd7fdffe5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java @@ -20,13 +20,21 @@ */ package org.apache.qpid.server.subscription; +import java.util.Map; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageFlowMode; /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory @@ -56,4 +64,23 @@ public interface SubscriptionFactory RecordDeliveryMethod recordMethod ) throws AMQException; + + + SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel, + AMQProtocolSession session, + AMQShortString consumerTag, + FieldTable filters, + boolean noLocal, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException; + + Subscription_0_10 createSubscription(final ServerSession session, + final String destination, + final MessageAcceptMode acceptMode, + final MessageAcquireMode acquireMode, + final MessageFlowMode flowMode, + final FlowCreditManager_0_10 creditManager, + final FilterManager filterManager, + final Map<String,Object> arguments); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java index 1bba2529c6..1622d63648 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java @@ -20,17 +20,28 @@ */ package org.apache.qpid.server.subscription; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.transport.ServerSession; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageFlowMode; public class SubscriptionFactoryImpl implements SubscriptionFactory { + private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0); + public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal, FlowCreditManager creditManager) throws AMQException @@ -78,18 +89,47 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory if(isBrowser) { - return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); + return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId()); } else if(acks) { - return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); + return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId()); } else { - return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); + return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId()); } } + public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel, + final AMQProtocolSession session, + final AMQShortString consumerTag, + final FieldTable filters, + final boolean noLocal, + final FlowCreditManager creditManager, + final ClientDeliveryMethod deliveryMethod, + final RecordDeliveryMethod recordMethod) throws AMQException + { + return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId()); + } + + public Subscription_0_10 createSubscription(final ServerSession session, + final String destination, + final MessageAcceptMode acceptMode, + final MessageAcquireMode acquireMode, + final MessageFlowMode flowMode, + final FlowCreditManager_0_10 creditManager, + final FilterManager filterManager, + final Map<String,Object> arguments) + { + return new Subscription_0_10(session, destination, acceptMode, acquireMode, + flowMode, creditManager, filterManager, arguments, getNextSubscriptionId()); + } public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl(); + + private static long getNextSubscriptionId() + { + return SUB_ID_GENERATOR.getAndIncrement(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d8f44c9f7f..b791596054 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -88,9 +88,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final Lock _stateChangeLock; - private static final AtomicLong idGenerator = new AtomicLong(0); - // Create a simple ID that increments for ever new Subscription - private final long _subscriptionID = idGenerator.getAndIncrement(); + private final long _subscriptionID; private LogSubject _logSubject; private LogActor _logActor; private UUID _id; @@ -104,10 +102,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + RecordDeliveryMethod recordMethod, + long subscriptionID) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); } @@ -151,10 +150,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + RecordDeliveryMethod recordMethod, + long subscriptionID) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); } @@ -211,16 +211,45 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + /** + * NoAck Subscription for use with BasicGet method. + */ + public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription + { + public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, + AMQShortString consumerTag, FieldTable filters, + boolean noLocal, FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod, + long subscriptionID) + throws AMQException + { + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + } + + public boolean isTransient() + { + return true; + } + + public boolean wouldSuspend(QueueEntry msg) + { + return !getCreditManager().useCreditForMessage(msg.getMessage()); + } + + } + static final class AckSubscription extends SubscriptionImpl { public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + RecordDeliveryMethod recordMethod, + long subscriptionID) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); } @@ -296,10 +325,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable arguments, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) + RecordDeliveryMethod recordMethod, + long subscriptionID) throws AMQException { - + _subscriptionID = subscriptionID; _channel = channel; _consumerTag = consumerTag; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 68e47fd86a..309879c17c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -40,7 +40,6 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.AMQMessage; @@ -80,10 +79,7 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { - - private static final AtomicLong idGenerator = new AtomicLong(0); - // Create a simple ID that increments for ever new Subscription - private final long _subscriptionID = idGenerator.getAndIncrement(); + private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); @@ -114,7 +110,6 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private MessageFlowMode _flowMode; private final ServerSession _session; private AtomicBoolean _stopped = new AtomicBoolean(true); - private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>(); private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; private LogActor _logActor; @@ -131,8 +126,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr MessageAcquireMode acquireMode, MessageFlowMode flowMode, FlowCreditManager_0_10 creditManager, - FilterManager filters,Map<String, Object> arguments) + FilterManager filters,Map<String, Object> arguments, long subscriptionId) { + _subscriptionID = subscriptionId; _session = session; _destination = destination; _acceptMode = acceptMode; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 19c11080b8..f65cad23e7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -74,7 +74,7 @@ public class ServerConnectionDelegate extends ServerDelegate public ServerSession getSession(Connection conn, SessionAttach atc) { - SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); + SessionDelegate serverSessionDelegate = new ServerSessionDelegate(); ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 0f4147ebb4..65790e2e6f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -129,16 +129,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); } - protected void setState(State state) - { - super.setState(state); - - if (state == State.OPEN) - { - _actor.message(ChannelMessages.CREATE()); - } - } - public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) { super(connection, delegate, name, expiry); @@ -150,6 +140,16 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi getConfigStore().addConfiguredObject(this); } + protected void setState(State state) + { + super.setState(state); + + if (state == State.OPEN) + { + _actor.message(ChannelMessages.CREATE()); + } + } + private ConfigStore getConfigStore() { return getConnectionConfig().getConfigStore(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index f316d60c6a..f89e86e433 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -47,11 +47,11 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Acquired; @@ -98,11 +98,10 @@ import org.apache.qpid.transport.TxSelect; public class ServerSessionDelegate extends SessionDelegate { private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); - private final IApplicationRegistry _appRegistry; - public ServerSessionDelegate(IApplicationRegistry appRegistry) + public ServerSessionDelegate() { - _appRegistry = appRegistry; + } @Override @@ -254,7 +253,7 @@ public class ServerSessionDelegate extends SessionDelegate return; } - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, + Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java new file mode 100644 index 0000000000..29f45bf7f4 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.protocol.ProtocolEngine_0_10; +import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.transport.ServerSession; +import org.apache.qpid.server.transport.ServerSessionDelegate; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.TestNetworkConnection; + +public class SubscriptionFactoryImplTest extends InternalBrokerBaseCase +{ + /** + * Tests that while creating Subscriptions of various types, the + * ID numbers assigned are allocated from a common sequence + * (in increasing order). + */ + public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception + { + //create a No-Ack subscription, get the first Subscription ID + long previousId = 0; + Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), false, null, false, getChannel().getCreditManager()); + previousId = noAckSub.getSubscriptionID(); + + //create an ack subscription, verify the next Subscription ID is used + Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); + previousId = ackSub.getSubscriptionID(); + + //create a browser subscription + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, getSession(), new AMQShortString("1"), true, null, false, getChannel().getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); + previousId = browerSub.getSubscriptionID(); + + //create an BasicGet NoAck subscription + Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(getChannel(), getSession(), new AMQShortString("1"), null, false, + getChannel().getCreditManager(),getChannel().getClientDeliveryMethod(), getChannel().getRecordDeliveryMethod()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); + previousId = getNoAckSub.getSubscriptionID(); + + //create a 0-10 subscription + ServerConnection conn = new ServerConnection(1); + ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), getRegistry()); + conn.setVirtualHost(getVirtualHost()); + conn.setConnectionConfig(engine); + ServerSessionDelegate sesDel = new ServerSessionDelegate(); + Binary name = new Binary(new byte[]{new Byte("1")}); + ServerSession session = new ServerSession(conn, sesDel, name, 0, engine); + + Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID()); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index ff94942457..a97134a58d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -44,14 +44,13 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.util.MockChannel; public class InternalBrokerBaseCase extends QpidTestCase { private IApplicationRegistry _registry; private MessageStore _messageStore; - private MockChannel _channel; + private AMQChannel _channel; private InternalTestProtocolSession _session; private VirtualHost _virtualHost; private AMQQueue _queue; @@ -111,7 +110,7 @@ public class InternalBrokerBaseCase extends QpidTestCase _session = new InternalTestProtocolSession(_virtualHost); CurrentActor.set(_session.getLogActor()); - _channel = new MockChannel(_session, 1, _messageStore); + _channel = new AMQChannel(_session, 1, _messageStore); _session.addChannel(_channel); } @@ -283,12 +282,12 @@ public class InternalBrokerBaseCase extends QpidTestCase _messageStore = messageStore; } - public MockChannel getChannel() + public AMQChannel getChannel() { return _channel; } - public void setChannel(MockChannel channel) + public void setChannel(AMQChannel channel) { _channel = channel; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java b/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java deleted file mode 100644 index 9bd1e7c5e1..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; - -public class MockChannel extends AMQChannel -{ - public MockChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) - throws AMQException - { - super(session, channelId, messageStore); - } - - - -} |
