summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
commit1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch)
treef304353182e02369661b8ecfdde357a288b183e3 /qpid/java/broker-plugins
parent8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff)
downloadqpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java66
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java15
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java15
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java84
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java79
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java64
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java54
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java8
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java31
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java6
14 files changed, 295 insertions, 152 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index db7ee54cb2..088c80c219 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,7 +22,9 @@ package org.apache.qpid.server.protocol.v0_10;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +39,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -77,7 +80,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private ConsumerImpl _consumer;
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
public ConsumerTarget_0_10(ServerSession session,
@@ -101,11 +104,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public ConsumerImpl getConsumer()
- {
- return _consumer;
- }
-
public boolean isSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
@@ -116,11 +114,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
boolean closed = false;
State state = getState();
- final ConsumerImpl consumer = getConsumer();
- if(consumer != null)
- {
- consumer.getSendLock();
- }
+ getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -135,10 +129,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
}
finally
{
- if(consumer != null)
- {
- consumer.releaseSendLock();
- }
+ releaseSendLock();
}
return closed;
@@ -153,7 +144,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(!updateState(State.SUSPENDED, State.ACTIVE))
{
// this is a hack to get round the issue of increasing bytes credit
- getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ notifyCurrentState();
}
}
else
@@ -200,7 +191,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -303,12 +294,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body);
+ xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body);
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW));
}
else if(_flowMode == MessageFlowMode.WINDOW)
{
@@ -325,11 +316,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_postIdSettingAction.setXfr(xfr);
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else
{
@@ -401,12 +392,23 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
entry.setRedelivered();
entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(getConsumer()))
+ if(isAcquiredByConsumer(entry))
{
entry.delete();
}
}
+ private boolean isAcquiredByConsumer(final MessageInstance entry)
+ {
+ ConsumerImpl acquiringConsumer = entry.getAcquiringConsumer();
+ if(acquiringConsumer instanceof QueueConsumer)
+ {
+ return ((QueueConsumer)acquiringConsumer).getTarget() == this;
+ }
+
+ return false;
+ }
+
void release(final MessageInstance entry, final boolean setRedelivered)
{
if (setRedelivered)
@@ -503,7 +505,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
try
{
- getConsumer().getSendLock();
+ getSendLock();
updateState(State.ACTIVE, State.SUSPENDED);
_stopped.set(true);
@@ -512,7 +514,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
}
finally
{
- getConsumer().releaseSendLock();
+ releaseSendLock();
}
}
@@ -572,7 +574,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
public boolean deleteAcquired(MessageInstance entry)
{
- if(entry.isAcquiredBy(getConsumer()))
+ if(isAcquiredByConsumer(entry))
{
acquisitionRemoved(entry);
entry.delete();
@@ -594,7 +596,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
public void flush()
{
flushCreditState(true);
- getConsumer().flush();
+ for(ConsumerImpl consumer : _consumers)
+ {
+ consumer.flush();
+ }
stop();
}
@@ -626,12 +631,17 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
public void consumerAdded(final ConsumerImpl sub)
{
- _consumer = sub;
+ _consumers.add(sub);
}
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
public long getUnacknowledgedBytes()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 95dba11ea0..66f8c97063 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -32,16 +33,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final MessageInstance _entry;
private final ConsumerTarget_0_10 _target;
+ private final ConsumerImpl _consumer;
- public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final ConsumerImpl consumer)
{
_entry = entry;
_target = target;
+ _consumer = consumer;
}
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
+ if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
@@ -54,7 +59,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_consumer))
{
_target.release(_entry, setRedelivered);
}
@@ -66,7 +71,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_consumer))
{
_target.reject(_entry);
}
@@ -79,7 +84,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- return _entry.acquire(_target.getConsumer());
+ return _entry.acquire(_consumer);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index c459364dbb..5467a57fa1 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,12 +31,16 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final MessageInstance _entry;
- private ConsumerTarget_0_10 _target;
+ private final ConsumerTarget_0_10 _target;
+ private final ConsumerImpl _consumer;
- public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final ConsumerImpl consumer)
{
_entry = entry;
_target = target;
+ _consumer = consumer;
}
public void onAccept()
@@ -45,7 +50,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_target.getConsumer()))
+ if(_entry.isAcquiredBy(_consumer))
{
_target.release(_entry, setRedelivered);
}
@@ -57,7 +62,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_entry.isAcquiredBy(_target.getConsumer()))
+ if(_entry.isAcquiredBy(_consumer))
{
_target.reject(_entry);
}
@@ -70,7 +75,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- boolean acquired = _entry.acquire(_target.getConsumer());
+ boolean acquired = _entry.acquire(_consumer);
if(acquired)
{
_target.recordUnacknowledged(_entry);
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index 7917b7989a..d581d146a8 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.transport.Method;
@@ -29,16 +30,22 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private final ConsumerImpl _consumer;
private long _messageSize;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
+ final ConsumerImpl consumer,
+ ServerSession session,
+ MessageInstance entry,
+ boolean restoreCredit)
{
super();
_sub = sub;
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ _consumer = consumer;
if(restoreCredit)
{
_messageSize = entry.getMessage().getSize();
@@ -51,7 +58,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
_sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
+ if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 5adeba66b1..14082091f9 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -25,6 +25,7 @@ import java.security.AccessControlException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -198,15 +199,44 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
String queueName = method.getQueue();
- VirtualHostImpl vhost = getVirtualHost(session);
+ VirtualHostImpl<?,?,?> vhost = getVirtualHost(session);
+ final Collection<MessageSource> sources = new HashSet<>();
final MessageSource queue = vhost.getMessageSource(queueName);
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && method.getArguments() != null
+ && method.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vhost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = method.getArguments().get("x-multiqueue").toString();
+ }
- if(queue == null)
+ if(sources.isEmpty())
{
- exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(!queue.verifySessionAccess((ServerSession)session))
+ else if(!verifySessionAccess((ServerSession) session, sources))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -250,12 +280,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
options.add(ConsumerImpl.Option.EXCLUSIVE);
}
- ((ServerSession)session).register(
- queue.addConsumer(target,
- filterManager,
- MessageTransferMessage.class,
- destination,
- options));
+ for(MessageSource source : sources)
+ {
+ ((ServerSession) session).register(
+ source.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options));
+ }
}
catch (AMQQueue.ExistingExclusiveConsumer existing)
{
@@ -278,6 +311,23 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
+ protected boolean verifySessionAccess(final ServerSession session, final Collection<MessageSource> queues)
+ {
+ for(MessageSource source : queues)
+ {
+ if(!verifySessionAccess(session, source))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue)
+ {
+ return queue.verifySessionAccess(session);
+ }
+
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
@@ -820,17 +870,15 @@ public class ServerSessionDelegate extends SessionDelegate
return destination;
}
- private VirtualHostImpl getVirtualHost(Session session)
+ private VirtualHostImpl<?,?,?> getVirtualHost(Session session)
{
ServerConnection conn = getServerConnection(session);
- VirtualHostImpl vhost = conn.getVirtualHost();
- return vhost;
+ return conn.getVirtualHost();
}
private ServerConnection getServerConnection(Session session)
{
- ServerConnection conn = (ServerConnection) session.getConnection();
- return conn;
+ return (ServerConnection) session.getConnection();
}
@Override
@@ -1238,7 +1286,7 @@ public class ServerSessionDelegate extends SessionDelegate
exception(session, method, errorCode, description);
}
- else if (!queue.verifySessionAccess((ServerSession)session))
+ else if (!verifySessionAccess((ServerSession) session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1296,7 +1344,7 @@ public class ServerSessionDelegate extends SessionDelegate
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (!queue.verifySessionAccess((ServerSession)session))
+ if (!verifySessionAccess((ServerSession) session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1357,7 +1405,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if(!queue.verifySessionAccess((ServerSession)session))
+ if(!verifySessionAccess((ServerSession) session, queue))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 49bc26149e..d1ec2e139e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -61,6 +61,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -543,10 +544,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
- public ConsumerImpl getSubscription(AMQShortString tag)
+ public ConsumerTarget getSubscription(AMQShortString tag)
{
- final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
- return target == null ? null : target.getConsumer();
+ return _tag2SubscriptionTargetMap.get(tag);
}
/**
@@ -555,7 +555,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
*
*
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param source the queue to subscribe to
+ * @param sources the queues to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
@@ -564,7 +564,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
*
* @throws org.apache.qpid.AMQException if something goes wrong
*/
- public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
@@ -632,18 +632,21 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
});
}
- ConsumerImpl sub =
- source.addConsumer(target,
- filterManager,
- AMQMessage.class,
- AMQShortString.toString(tag),
- options);
- if(sub instanceof Consumer<?>)
+ for(MessageSource source : sources)
{
- final Consumer<?> modelConsumer = (Consumer<?>) sub;
- consumerAdded(modelConsumer);
- modelConsumer.addChangeListener(_consumerClosedListener);
- _consumers.add(modelConsumer);
+ ConsumerImpl sub =
+ source.addConsumer(target,
+ filterManager,
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
+ if (sub instanceof Consumer<?>)
+ {
+ final Consumer<?> modelConsumer = (Consumer<?>) sub;
+ consumerAdded(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ _consumers.add(modelConsumer);
+ }
}
}
catch (AccessControlException e)
@@ -683,13 +686,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- ConsumerImpl sub = target == null ? null : target.getConsumer();
- if (sub != null)
+ Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
+ if (subs != null)
{
- sub.close();
- if(sub instanceof Consumer<?>)
+ for(ConsumerImpl sub : subs)
{
- _consumers.remove(sub);
+ sub.close();
+ if (sub instanceof Consumer<?>)
+ {
+ _consumers.remove(sub);
+ }
}
return true;
}
@@ -763,11 +769,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- ConsumerImpl sub = me.getValue().getConsumer();
+ Collection<ConsumerImpl> subs = me.getValue().getConsumers();
- if(sub != null)
+ if(subs != null)
{
- sub.close();
+ for(ConsumerImpl sub : subs)
+ {
+ sub.close();
+ }
}
}
@@ -1032,7 +1041,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// may need to deliver queued messages
for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getConsumer().externalStateChange();
+ for(ConsumerImpl sub : s.getConsumers())
+ {
+ sub.externalStateChange();
+ }
}
}
@@ -1050,11 +1062,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
try
{
- s.getConsumer().getSendLock();
+ s.getSendLock();
}
finally
{
- s.getConsumer().releaseSendLock();
+ s.releaseSendLock();
}
}
}
@@ -1133,8 +1145,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// ensure all subscriptions have seen the change to the channel state
for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getConsumer().getSendLock();
- sub.getConsumer().releaseSendLock();
+ sub.getSendLock();
+ sub.releaseSendLock();
}
try
@@ -1169,9 +1181,12 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if(requiresSuspend)
{
_suspended.set(false);
- for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 target : _tag2SubscriptionTargetMap.values())
{
- sub.getConsumer().externalStateChange();
+ for(ConsumerImpl sub : target.getConsumers())
+ {
+ sub.externalStateChange();
+ }
}
}
@@ -1179,7 +1194,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public String toString()
{
- return "["+_session.toString()+":"+_channelId+"]";
+ return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index d5eed242e7..acb74c99e6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -73,7 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private ConsumerImpl _consumer;
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -93,6 +95,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
+ public List<ConsumerImpl> getConsumers()
+ {
+ return _consumers;
+ }
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -111,12 +118,14 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* thread safe.
*
*
+ *
+ * @param consumer
* @param entry
* @param batch
* @throws org.apache.qpid.AMQException
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -124,7 +133,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -173,11 +182,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ * @param consumer
* @param entry The message to send
* @param batch
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -200,7 +210,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- size = sendToClient(message, props, deliveryTag);
+ size = sendToClient(consumer, message, props, deliveryTag);
}
ref.release();
@@ -287,11 +297,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ * @param consumer
* @param entry The message to send
* @param batch
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -301,9 +312,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
- recordMessageDelivery(entry, deliveryTag);
+ recordMessageDelivery(consumer, entry, deliveryTag);
entry.addStateChangeListener(getReleasedStateChangeListener());
- long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
return size;
}
@@ -366,20 +377,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- public ConsumerImpl getConsumer()
- {
- return _consumer;
- }
-
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
@Override
public void consumerAdded(final ConsumerImpl sub)
{
- _consumer = sub;
+ _consumers.add( sub );
}
public AMQSessionModel getSessionModel()
@@ -426,12 +437,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
boolean closed = false;
State state = getState();
- final ConsumerImpl consumer = getConsumer();
+ getSendLock();
- if(consumer != null)
- {
- consumer.getSendLock();
- }
try
{
while(!closed && state != State.CLOSED)
@@ -447,10 +454,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
finally
{
- if(consumer != null)
- {
- consumer.releaseSendLock();
- }
+ releaseSendLock();
}
}
@@ -493,7 +497,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
if(!updateState(State.SUSPENDED, State.ACTIVE))
{
// this is a hack to get round the issue of increasing bytes credit
- getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ notifyCurrentState();
}
}
else
@@ -502,16 +506,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+ final InstanceProperties props,
+ final long deliveryTag)
{
- return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+ return _deliveryMethod.deliverToClient(consumer, message, props, deliveryTag);
}
- protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
+ protected void recordMessageDelivery(final ConsumerImpl consumer,
+ final MessageInstance entry,
+ final long deliveryTag)
{
- _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
+ _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index ff0e4199cf..c1e3d850ef 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.HashSet;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -32,13 +36,11 @@ import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
{
private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
@@ -59,7 +61,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHostImpl vHost = protocolConnection.getVirtualHost();
+ VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
if (channel == null)
{
@@ -68,25 +70,55 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
else
{
channel.sync();
+ String queueName = body.getQueue() == null ? null : body.getQueue().asString();
if (_logger.isDebugEnabled())
{
- _logger.debug("BasicConsume: from '" + body.getQueue() +
+ _logger.debug("BasicConsume: from '" + queueName +
"' for:" + body.getConsumerTag() +
" nowait:" + body.getNowait() +
" args:" + body.getArguments());
}
- MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && body.getArguments() != null
+ && body.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = body.getArguments().get("x-multiqueue").toString();
+ }
- if (queue == null)
+ if (sources.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No queue for '" + body.getQueue() + "'");
+ _logger.debug("No queue for '" + queueName + "'");
}
- if (body.getQueue() != null)
+ if (queueName != null)
{
- String msg = "No such queue, '" + body.getQueue() + "'";
+ String msg = "No such queue, '" + queueName + "'";
throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
@@ -114,7 +146,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
- queue,
+ sources,
!body.getNoAck(),
body.getArguments(),
body.getExclusive(),
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 58989bbef9..235d263798 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -138,7 +139,9 @@ public class AcknowledgeTest extends QpidTestCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
+ AMQShortString subscriber = _channel.consumeFromSource(null,
+ Collections.singleton(_queue),
+ true, null, true, false);
getQueue().deliverAsync();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index cf065de38a..a4402efc84 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -143,6 +144,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.consumeFromSource(null, queue, true, filters, true, false);
+ return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false);
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 918a890af5..c5d9a5e35d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -93,7 +93,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -108,11 +108,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
finally
{
- getConsumer().releaseSendLock();
+ releaseSendLock();
}
}
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// TODO
long size = entry.getMessage().getSize();
@@ -515,7 +515,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
-
+ close();
}
@Override
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 34f08615ad..8a3ef65979 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1065,6 +1065,12 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index a3b1f932ac..3f873a24ff 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.management.amqp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
@@ -27,19 +31,12 @@ import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
class ManagementNodeConsumer implements ConsumerImpl
{
private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
- private final Lock _stateChangeLock = new ReentrantLock();
private final String _name;
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
@@ -49,7 +46,7 @@ class ManagementNodeConsumer implements ConsumerImpl
_name = consumerName;
_managementNode = managementNode;
_target = target;
- target.setStateListener(_targetChangeListener);
+ target.addStateListener(_targetChangeListener);
}
@Override
@@ -133,19 +130,19 @@ class ManagementNodeConsumer implements ConsumerImpl
@Override
public boolean trySendLock()
{
- return _stateChangeLock.tryLock();
+ return _target.trySendLock();
}
@Override
public void getSendLock()
{
- _stateChangeLock.lock();
+ _target.getSendLock();
}
@Override
public void releaseSendLock()
{
- _stateChangeLock.unlock();
+ _target.releaseSendLock();
}
@@ -174,13 +171,13 @@ class ManagementNodeConsumer implements ConsumerImpl
void send(final InternalMessage response)
{
- getSendLock();
+ _target.getSendLock();
try
{
final ManagementResponse responseEntry = new ManagementResponse(this, response);
if(_queue.isEmpty() && _target.allocateCredit(response))
{
- _target.send(responseEntry,false);
+ _target.send(this, responseEntry, false);
}
else
{
@@ -189,7 +186,7 @@ class ManagementNodeConsumer implements ConsumerImpl
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
@@ -209,7 +206,7 @@ class ManagementNodeConsumer implements ConsumerImpl
private void deliverMessages()
{
- getSendLock();
+ _target.getSendLock();
try
{
while(!_queue.isEmpty())
@@ -219,7 +216,7 @@ class ManagementNodeConsumer implements ConsumerImpl
if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
{
_queue.remove(0);
- _target.send(managementResponse,false);
+ _target.send(this, managementResponse, false);
}
else
{
@@ -229,7 +226,7 @@ class ManagementNodeConsumer implements ConsumerImpl
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 03e7eab61b..501ce40db7 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -84,6 +84,12 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return _consumer;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();