diff options
| author | Gordon Sim <gsim@apache.org> | 2007-01-29 12:13:04 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-01-29 12:13:04 +0000 |
| commit | a327ecc9225fb303f7fc1305d0e135f331dc7bce (patch) | |
| tree | 9c15917d217dd3e45ad7df25e09c7a07b0b4349d /java | |
| parent | 81816f0a27c9115689a0e1f4c4b0b5bef6f71590 (diff) | |
| download | qpid-python-a327ecc9225fb303f7fc1305d0e135f331dc7bce.tar.gz | |
Fixes to get the python queue tests to work.
(NB: currently, auto-delete is not in so tests that re-use the same exclusive queue conflict with each other)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501021 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
15 files changed, 246 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5b6bd24faf..3c2c44d422 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -375,7 +375,7 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -386,7 +386,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal); + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); return tag; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index e3c1b11162..a0105005b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -54,20 +54,18 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException { ConnectionOpenBody body = evt.getMethod(); - String contextKey = body.virtualHost; //todo //FIXME The virtual host must be validated by the server for the connection to open-ok // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (contextKey == null) + if (protocolSession.getContextKey() == null) { - contextKey = generateClientID(); + protocolSession.setContextKey(generateClientID()); } - protocolSession.setContextKey(contextKey); // Be aware of possible changes to parameter order as versions change. AMQMethodBody response = ConnectionOpenOkBody.createMethodBody( protocolSession.getMajor(), // AMQP major version protocolSession.getMinor(), // AMQP minor version - contextKey); // knownHosts + body.virtualHost); // knownHosts protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN); protocolSession.writeResponse(evt, response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index 61da80a2d1..83121e7977 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -84,7 +84,7 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo try { /*AMQShort*/String destination = channel.subscribeToQueue - (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal); + (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal, body.exclusive); // Be aware of possible changes to parameter order as versions change. session.writeResponse(evt, MessageOkBody.createMethodBody( session.getMajor(), // AMQP major version diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index a3811134cb..85fc8290e0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; @@ -77,26 +78,45 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } //TODO: do we need to check that the queue already exists with exactly the same "configuration"? + AMQQueue queue = null; QueueRegistry queueRegistry = protocolSession.getQueueRegistry(); synchronized (queueRegistry) { - AMQQueue queue; if ((queue = queueRegistry.getQueue(body.queue)) == null) { - queue = createQueue(body, queueRegistry, protocolSession); - if (queue.isDurable() && !queue.isAutoDelete()) + if(body.passive) { - _store.createQueue(queue); + String msg = "Queue: " + body.queue + " not found."; + throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg ); + } - queueRegistry.registerQueue(queue); - if (autoRegister) + else { - Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct"); - defaultExchange.registerQueue(body.queue, queue, null); - queue.bind(body.queue, defaultExchange); - _log.info("Queue " + body.queue + " bound to default exchange"); + queue = createQueue(body, queueRegistry, protocolSession); + if (queue.isDurable() && !queue.isAutoDelete()) + { + _store.createQueue(queue); + } + queueRegistry.registerQueue(queue); + if (autoRegister) + { + Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct"); + defaultExchange.registerQueue(body.queue, queue, null); + queue.bind(body.queue, defaultExchange); + _log.info("Queue " + body.queue + " bound to default exchange"); + } } } + else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner())) + { + // todo - constant + throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + + } + else + { + _log.info("Queue " + body.queue + " exists and is accesible to this connection [owner=" + queue.getOwner() +"]"); + } //set this as the default queue on the channel: protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue); } @@ -106,8 +126,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar AMQMethodBody response = QueueDeclareOkBody.createMethodBody( protocolSession.getMajor(), // AMQP major version protocolSession.getMinor(), // AMQP minor version - 0L, // consumerCount - 0L, // messageCount + queue.getConsumerCount(), // consumerCount + queue.getMessageCount(), // messageCount body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); protocolSession.writeResponse(evt, response); @@ -128,6 +148,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar throws AMQException { String owner = body.exclusive ? session.getContextKey() : null; + if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner); return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 7e62c24e82..8ada41572c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -79,13 +79,26 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete } else { - int purged = queue.delete(body.ifUnused, body.ifEmpty); - _store.removeQueue(queue.getName()); - // Be aware of possible changes to parameter order as versions change. - session.writeResponse(evt, QueueDeleteOkBody.createMethodBody( - session.getMajor(), // AMQP major version - session.getMinor(), // AMQP minor version - purged)); // messageCount + if(body.ifEmpty && !queue.isEmpty()) + { + throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." ); + } + else if(body.ifUnused && !queue.isUnused()) + { + // TODO - Error code + throw body.getChannelException(406, "Queue: " + body.queue + " is still used." ); + + } + else + { + int purged = queue.delete(body.ifUnused, body.ifEmpty); + _store.removeQueue(queue.getName()); + // Be aware of possible changes to parameter order as versions change. + session.writeResponse(evt, QueueDeleteOkBody.createMethodBody( + session.getMajor(), // AMQP major version + session.getMinor(), // AMQP minor version + purged)); // messageCount + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java new file mode 100644 index 0000000000..0cec10cc1e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -0,0 +1,81 @@ +package org.apache.qpid.server.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; + +public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody> +{ + private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); + + public static QueuePurgeHandler getInstance() + { + return _instance; + } + + private final boolean _failIfNotFound; + + public QueuePurgeHandler() + { + this(true); + } + + public QueuePurgeHandler(boolean failIfNotFound) + { + _failIfNotFound = failIfNotFound; + } + + public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException + { + QueueRegistry queueRegistry = session.getQueueRegistry(); + + QueuePurgeBody body = evt.getMethod(); + AMQQueue queue; + if(body.queue == null) + { + queue = session.getChannel(evt.getChannelId()).getDefaultQueue(); + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified."); + } + + } + } + else + { + queue = queueRegistry.getQueue(body.queue); + } + + if(queue == null) + { + if(_failIfNotFound) + { + throw body.getChannelException(404, "Queue " + body.queue + " does not exist."); + } + } + else + { + long purged = queue.clearQueue(); + + + if(!body.nowait) + { + AMQMethodBody response + = QueuePurgeOkBody.createMethodBody(session.getMajor(), session.getMinor(), purged); + session.writeResponse(evt, response); + + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d12416437d..1d4f67000e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -49,6 +49,7 @@ import org.apache.qpid.framing.RequestManager; import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.framing.RequestResponseMappingException; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.protocol.AMQMethodEvent; @@ -82,6 +83,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private final IoSession _minaProtocolSession; private String _contextKey; @@ -666,6 +669,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void setClientProperties(FieldTable clientProperties) { _clientProperties = clientProperties; + if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)) + { + setContextKey(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)); + } } public QueueRegistry getQueueRegistry() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 101a2833a0..a3c4fb1820 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -34,6 +34,8 @@ import javax.management.JMException; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like @@ -41,6 +43,28 @@ import java.util.concurrent.Executor; */ public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException + { + + public ExistingExclusiveSubscription() + { + super(""); + } + } + + public static final class ExistingSubscriptionPreventsExclusive extends AMQException + { + + public ExistingSubscriptionPreventsExclusive() + { + super(""); + } + } + + private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription(); + private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive(); + + private static final Logger _logger = Logger.getLogger(AMQQueue.class); private final String _name; @@ -64,6 +88,10 @@ public class AMQQueue implements Managable, Comparable private final SubscriptionFactory _subscriptionFactory; + private final AtomicInteger _subscriberCount = new AtomicInteger(); + + private final AtomicBoolean _isExclusive = new AtomicBoolean(); + /** * Manages message delivery. */ @@ -352,9 +380,9 @@ public class AMQQueue implements Managable, Comparable /** * removes all the messages from the queue. */ - public void clearQueue() throws AMQException + public long clearQueue() throws AMQException { - _deliveryMgr.clearAllMessages(); + return _deliveryMgr.clearAllMessages(); } public void bind(String routingKey, Exchange exchange) @@ -362,14 +390,29 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException - { - registerProtocolSession(ps, channel, consumerTag, acks, filters, false); - } - - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { + if(incrementSubscriberCount() > 1) + { + if(isExclusive()) + { + decrementSubscriberCount(); + throw EXISTING_EXCLUSIVE; + } + else if(exclusive) + { + decrementSubscriberCount(); + throw EXISTING_SUBSCRIPTION; + } + + } + else if(exclusive) + { + setExclusive(true); + } + debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); @@ -378,13 +421,33 @@ public class AMQQueue implements Managable, Comparable { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } _subscribers.addSubscriber(subscription); } + private boolean isExclusive() + { + return _isExclusive.get(); + } + + private void setExclusive(boolean exclusive) + { + _isExclusive.set(exclusive); + } + + private int incrementSubscriberCount() + { + return _subscriberCount.incrementAndGet(); + } + + private int decrementSubscriberCount() + { + return _subscriberCount.decrementAndGet(); + } + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException { debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, @@ -400,6 +463,10 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + setExclusive(false); + decrementSubscriberCount(); + + // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { @@ -410,6 +477,17 @@ public class AMQQueue implements Managable, Comparable } } + public boolean isUnused() + { + return _subscribers.isEmpty(); + } + + public boolean isEmpty() + { + return !_deliveryMgr.hasQueuedMessages(); + } + + public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException { if (checkUnused && !_subscribers.isEmpty()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index 2e49a9d4b1..560b881ee3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -217,7 +217,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages() throws AMQException + public synchronized long clearAllMessages() throws AMQException { AMQMessage msg = poll(); while (msg != null) @@ -225,6 +225,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager msg.dequeue(_queue); msg = poll(); } + return 0; } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 74f69030e0..5b3bf2bbeb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -176,14 +176,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages() throws AMQException + public synchronized long clearAllMessages() throws AMQException { + long count = 0; AMQMessage msg = poll(); while (msg != null) { msg.dequeue(_queue); + count++; msg = poll(); } + return count; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index cac499587f..936afcde10 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -70,7 +70,7 @@ interface DeliveryManager void removeAMessageFromTop() throws AMQException; - void clearAllMessages() throws AMQException; + long clearAllMessages() throws AMQException; List<AMQMessage> getMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index c967ea2cde..54cd3013ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -136,7 +136,7 @@ class SynchronizedDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages() throws AMQException + public synchronized long clearAllMessages() throws AMQException { AMQMessage msg = poll(); while (msg != null) @@ -144,6 +144,7 @@ class SynchronizedDeliveryManager implements DeliveryManager msg.dequeue(_queue); msg = poll(); } + return 0; } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 54be424d50..6f158c89d6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -134,6 +134,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance()); frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance()); frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance()); + frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance()); frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance()); frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance()); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index a0d243ca30..8ea0d6ef1a 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -92,7 +92,7 @@ public final class AMQConstant public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); - public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true); public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 7c5de67c5e..4352f56d90 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -70,7 +70,7 @@ public class AMQQueueMBeanTest extends TestCase _channel = new AMQChannel(1,_protocolSession, _messageStore, null,null); _protocolSession.addChannel(_channel); - _queue.registerProtocolSession(_protocolSession, 1, "test", false, null); + _queue.registerProtocolSession(_protocolSession, 1, "test", false, null, false, false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; |
