summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-01-29 12:13:04 +0000
committerGordon Sim <gsim@apache.org>2007-01-29 12:13:04 +0000
commita327ecc9225fb303f7fc1305d0e135f331dc7bce (patch)
tree9c15917d217dd3e45ad7df25e09c7a07b0b4349d /java
parent81816f0a27c9115689a0e1f4c4b0b5bef6f71590 (diff)
downloadqpid-python-a327ecc9225fb303f7fc1305d0e135f331dc7bce.tar.gz
Fixes to get the python queue tests to work.
(NB: currently, auto-delete is not in so tests that re-use the same exclusive queue conflict with each other) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501021 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java96
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java2
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
15 files changed, 246 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5b6bd24faf..3c2c44d422 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -375,7 +375,7 @@ public class AMQChannel
* @throws AMQException if something goes wrong
*/
public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -386,7 +386,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index e3c1b11162..a0105005b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -54,20 +54,18 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
ConnectionOpenBody body = evt.getMethod();
- String contextKey = body.virtualHost;
//todo //FIXME The virtual host must be validated by the server for the connection to open-ok
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (contextKey == null)
+ if (protocolSession.getContextKey() == null)
{
- contextKey = generateClientID();
+ protocolSession.setContextKey(generateClientID());
}
- protocolSession.setContextKey(contextKey);
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody response = ConnectionOpenOkBody.createMethodBody(
protocolSession.getMajor(), // AMQP major version
protocolSession.getMinor(), // AMQP minor version
- contextKey); // knownHosts
+ body.virtualHost); // knownHosts
protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeResponse(evt, response);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
index 61da80a2d1..83121e7977 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
@@ -84,7 +84,7 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo
try
{
/*AMQShort*/String destination = channel.subscribeToQueue
- (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal);
+ (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal, body.exclusive);
// Be aware of possible changes to parameter order as versions change.
session.writeResponse(evt, MessageOkBody.createMethodBody(
session.getMajor(), // AMQP major version
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index a3811134cb..85fc8290e0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
@@ -77,26 +78,45 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+ AMQQueue queue = null;
QueueRegistry queueRegistry = protocolSession.getQueueRegistry();
synchronized (queueRegistry)
{
- AMQQueue queue;
if ((queue = queueRegistry.getQueue(body.queue)) == null)
{
- queue = createQueue(body, queueRegistry, protocolSession);
- if (queue.isDurable() && !queue.isAutoDelete())
+ if(body.passive)
{
- _store.createQueue(queue);
+ String msg = "Queue: " + body.queue + " not found.";
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
+
}
- queueRegistry.registerQueue(queue);
- if (autoRegister)
+ else
{
- Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct");
- defaultExchange.registerQueue(body.queue, queue, null);
- queue.bind(body.queue, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange");
+ queue = createQueue(body, queueRegistry, protocolSession);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _store.createQueue(queue);
+ }
+ queueRegistry.registerQueue(queue);
+ if (autoRegister)
+ {
+ Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct");
+ defaultExchange.registerQueue(body.queue, queue, null);
+ queue.bind(body.queue, defaultExchange);
+ _log.info("Queue " + body.queue + " bound to default exchange");
+ }
}
}
+ else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ {
+ // todo - constant
+ throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+
+ }
+ else
+ {
+ _log.info("Queue " + body.queue + " exists and is accesible to this connection [owner=" + queue.getOwner() +"]");
+ }
//set this as the default queue on the channel:
protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
@@ -106,8 +126,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
AMQMethodBody response = QueueDeclareOkBody.createMethodBody(
protocolSession.getMajor(), // AMQP major version
protocolSession.getMinor(), // AMQP minor version
- 0L, // consumerCount
- 0L, // messageCount
+ queue.getConsumerCount(), // consumerCount
+ queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeResponse(evt, response);
@@ -128,6 +148,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
throws AMQException
{
String owner = body.exclusive ? session.getContextKey() : null;
+ if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner);
return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 7e62c24e82..8ada41572c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -79,13 +79,26 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName());
- // Be aware of possible changes to parameter order as versions change.
- session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
- session.getMajor(), // AMQP major version
- session.getMinor(), // AMQP minor version
- purged)); // messageCount
+ if(body.ifEmpty && !queue.isEmpty())
+ {
+ throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." );
+ }
+ else if(body.ifUnused && !queue.isUnused())
+ {
+ // TODO - Error code
+ throw body.getChannelException(406, "Queue: " + body.queue + " is still used." );
+
+ }
+ else
+ {
+ int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ _store.removeQueue(queue.getName());
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
+ session.getMajor(), // AMQP major version
+ session.getMinor(), // AMQP minor version
+ purged)); // messageCount
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
new file mode 100644
index 0000000000..0cec10cc1e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -0,0 +1,81 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
+{
+ private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+ public static QueuePurgeHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private final boolean _failIfNotFound;
+
+ public QueuePurgeHandler()
+ {
+ this(true);
+ }
+
+ public QueuePurgeHandler(boolean failIfNotFound)
+ {
+ _failIfNotFound = failIfNotFound;
+ }
+
+ public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ {
+ QueueRegistry queueRegistry = session.getQueueRegistry();
+
+ QueuePurgeBody body = evt.getMethod();
+ AMQQueue queue;
+ if(body.queue == null)
+ {
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+ }
+
+ }
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.queue);
+ }
+
+ if(queue == null)
+ {
+ if(_failIfNotFound)
+ {
+ throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+ }
+ }
+ else
+ {
+ long purged = queue.clearQueue();
+
+
+ if(!body.nowait)
+ {
+ AMQMethodBody response
+ = QueuePurgeOkBody.createMethodBody(session.getMajor(), session.getMinor(), purged);
+ session.writeResponse(evt, response);
+
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index d12416437d..1d4f67000e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -49,6 +49,7 @@ import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -82,6 +83,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+
private final IoSession _minaProtocolSession;
private String _contextKey;
@@ -666,6 +669,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
+ if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+ {
+ setContextKey(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE));
+ }
}
public QueueRegistry getQueueRegistry()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 101a2833a0..a3c4fb1820 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -34,6 +34,8 @@ import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
@@ -41,6 +43,28 @@ import java.util.concurrent.Executor;
*/
public class AMQQueue implements Managable, Comparable
{
+ public static final class ExistingExclusiveSubscription extends AMQException
+ {
+
+ public ExistingExclusiveSubscription()
+ {
+ super("");
+ }
+ }
+
+ public static final class ExistingSubscriptionPreventsExclusive extends AMQException
+ {
+
+ public ExistingSubscriptionPreventsExclusive()
+ {
+ super("");
+ }
+ }
+
+ private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription();
+ private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
+
+
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final String _name;
@@ -64,6 +88,10 @@ public class AMQQueue implements Managable, Comparable
private final SubscriptionFactory _subscriptionFactory;
+ private final AtomicInteger _subscriberCount = new AtomicInteger();
+
+ private final AtomicBoolean _isExclusive = new AtomicBoolean();
+
/**
* Manages message delivery.
*/
@@ -352,9 +380,9 @@ public class AMQQueue implements Managable, Comparable
/**
* removes all the messages from the queue.
*/
- public void clearQueue() throws AMQException
+ public long clearQueue() throws AMQException
{
- _deliveryMgr.clearAllMessages();
+ return _deliveryMgr.clearAllMessages();
}
public void bind(String routingKey, Exchange exchange)
@@ -362,14 +390,29 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
- {
- registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
- }
-
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
{
+ if(incrementSubscriberCount() > 1)
+ {
+ if(isExclusive())
+ {
+ decrementSubscriberCount();
+ throw EXISTING_EXCLUSIVE;
+ }
+ else if(exclusive)
+ {
+ decrementSubscriberCount();
+ throw EXISTING_SUBSCRIPTION;
+ }
+
+ }
+ else if(exclusive)
+ {
+ setExclusive(true);
+ }
+
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
@@ -378,13 +421,33 @@ public class AMQQueue implements Managable, Comparable
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
_subscribers.addSubscriber(subscription);
}
+ private boolean isExclusive()
+ {
+ return _isExclusive.get();
+ }
+
+ private void setExclusive(boolean exclusive)
+ {
+ _isExclusive.set(exclusive);
+ }
+
+ private int incrementSubscriberCount()
+ {
+ return _subscriberCount.incrementAndGet();
+ }
+
+ private int decrementSubscriberCount()
+ {
+ return _subscriberCount.decrementAndGet();
+ }
+
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
{
debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
@@ -400,6 +463,10 @@ public class AMQQueue implements Managable, Comparable
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ setExclusive(false);
+ decrementSubscriberCount();
+
+
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -410,6 +477,17 @@ public class AMQQueue implements Managable, Comparable
}
}
+ public boolean isUnused()
+ {
+ return _subscribers.isEmpty();
+ }
+
+ public boolean isEmpty()
+ {
+ return !_deliveryMgr.hasQueuedMessages();
+ }
+
+
public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
{
if (checkUnused && !_subscribers.isEmpty())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index 2e49a9d4b1..560b881ee3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -217,7 +217,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized long clearAllMessages() throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
@@ -225,6 +225,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
msg.dequeue(_queue);
msg = poll();
}
+ return 0;
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 74f69030e0..5b3bf2bbeb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -176,14 +176,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized long clearAllMessages() throws AMQException
{
+ long count = 0;
AMQMessage msg = poll();
while (msg != null)
{
msg.dequeue(_queue);
+ count++;
msg = poll();
}
+ return count;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index cac499587f..936afcde10 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -70,7 +70,7 @@ interface DeliveryManager
void removeAMessageFromTop() throws AMQException;
- void clearAllMessages() throws AMQException;
+ long clearAllMessages() throws AMQException;
List<AMQMessage> getMessages();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index c967ea2cde..54cd3013ea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -136,7 +136,7 @@ class SynchronizedDeliveryManager implements DeliveryManager
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized long clearAllMessages() throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
@@ -144,6 +144,7 @@ class SynchronizedDeliveryManager implements DeliveryManager
msg.dequeue(_queue);
msg = poll();
}
+ return 0;
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 54be424d50..6f158c89d6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -134,6 +134,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
+ frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index a0d243ca30..8ea0d6ef1a 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -92,7 +92,7 @@ public final class AMQConstant
public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 7c5de67c5e..4352f56d90 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -70,7 +70,7 @@ public class AMQQueueMBeanTest extends TestCase
_channel = new AMQChannel(1,_protocolSession, _messageStore, null,null);
_protocolSession.addChannel(_channel);
- _queue.registerProtocolSession(_protocolSession, 1, "test", false, null);
+ _queue.registerProtocolSession(_protocolSession, 1, "test", false, null, false, false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;