diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
| commit | 1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch) | |
| tree | f304353182e02369661b8ecfdde357a288b183e3 /qpid/java/broker-plugins | |
| parent | 8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff) | |
| download | qpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz | |
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
14 files changed, 295 insertions, 152 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index db7ee54cb2..088c80c219 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -22,7 +22,9 @@ package org.apache.qpid.server.protocol.v0_10; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -37,6 +39,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueConsumer; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -77,7 +80,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final Map<String, Object> _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private ConsumerImpl _consumer; + private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); public ConsumerTarget_0_10(ServerSession session, @@ -101,11 +104,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public ConsumerImpl getConsumer() - { - return _consumer; - } - public boolean isSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension @@ -116,11 +114,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - final ConsumerImpl consumer = getConsumer(); - if(consumer != null) - { - consumer.getSendLock(); - } + getSendLock(); try { while(!closed && state != State.CLOSED) @@ -135,10 +129,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC } finally { - if(consumer != null) - { - consumer.releaseSendLock(); - } + releaseSendLock(); } return closed; @@ -153,7 +144,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(!updateState(State.SUSPENDED, State.ACTIVE)) { // this is a hack to get round the issue of increasing bytes credit - getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + notifyCurrentState(); } } else @@ -200,7 +191,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -303,12 +294,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED) - : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body); + xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW)); } else if(_flowMode == MessageFlowMode.WINDOW) { @@ -325,11 +316,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _postIdSettingAction.setXfr(xfr); if(_acceptMode == MessageAcceptMode.EXPLICIT) { - _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); + _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer)); } else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) { - _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); + _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer)); } else { @@ -401,12 +392,23 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { entry.setRedelivered(); entry.routeToAlternate(null, null); - if(entry.isAcquiredBy(getConsumer())) + if(isAcquiredByConsumer(entry)) { entry.delete(); } } + private boolean isAcquiredByConsumer(final MessageInstance entry) + { + ConsumerImpl acquiringConsumer = entry.getAcquiringConsumer(); + if(acquiringConsumer instanceof QueueConsumer) + { + return ((QueueConsumer)acquiringConsumer).getTarget() == this; + } + + return false; + } + void release(final MessageInstance entry, final boolean setRedelivered) { if (setRedelivered) @@ -503,7 +505,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { try { - getConsumer().getSendLock(); + getSendLock(); updateState(State.ACTIVE, State.SUSPENDED); _stopped.set(true); @@ -512,7 +514,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC } finally { - getConsumer().releaseSendLock(); + releaseSendLock(); } } @@ -572,7 +574,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC public boolean deleteAcquired(MessageInstance entry) { - if(entry.isAcquiredBy(getConsumer())) + if(isAcquiredByConsumer(entry)) { acquisitionRemoved(entry); entry.delete(); @@ -594,7 +596,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC public void flush() { flushCreditState(true); - getConsumer().flush(); + for(ConsumerImpl consumer : _consumers) + { + consumer.flush(); + } stop(); } @@ -626,12 +631,17 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override public void consumerAdded(final ConsumerImpl sub) { - _consumer = sub; + _consumers.add(sub); } @Override public void consumerRemoved(final ConsumerImpl sub) { + _consumers.remove(sub); + if(_consumers.isEmpty()) + { + close(); + } } public long getUnacknowledgedBytes() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 95dba11ea0..66f8c97063 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -32,16 +33,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final MessageInstance _entry; private final ConsumerTarget_0_10 _target; + private final ConsumerImpl _consumer; - public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) + public ExplicitAcceptDispositionChangeListener(MessageInstance entry, + ConsumerTarget_0_10 target, + final ConsumerImpl consumer) { _entry = entry; _target = target; + _consumer = consumer; } public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition()) + if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition()) { _target.getSessionModel().acknowledge(_target, _entry); } @@ -54,7 +59,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_consumer)) { _target.release(_entry, setRedelivered); } @@ -66,7 +71,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_consumer)) { _target.reject(_entry); } @@ -79,7 +84,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - return _entry.acquire(_target.getConsumer()); + return _entry.acquire(_consumer); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index c459364dbb..5467a57fa1 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener @@ -30,12 +31,16 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final MessageInstance _entry; - private ConsumerTarget_0_10 _target; + private final ConsumerTarget_0_10 _target; + private final ConsumerImpl _consumer; - public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) + public ImplicitAcceptDispositionChangeListener(MessageInstance entry, + ConsumerTarget_0_10 target, + final ConsumerImpl consumer) { _entry = entry; _target = target; + _consumer = consumer; } public void onAccept() @@ -45,7 +50,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_entry.isAcquiredBy(_target.getConsumer())) + if(_entry.isAcquiredBy(_consumer)) { _target.release(_entry, setRedelivered); } @@ -57,7 +62,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_entry.isAcquiredBy(_target.getConsumer())) + if(_entry.isAcquiredBy(_consumer)) { _target.reject(_entry); } @@ -70,7 +75,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - boolean acquired = _entry.acquire(_target.getConsumer()); + boolean acquired = _entry.acquire(_consumer); if(acquired) { _target.recordUnacknowledged(_entry); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index 7917b7989a..d581d146a8 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.transport.Method; @@ -29,16 +30,22 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; + private final ConsumerImpl _consumer; private long _messageSize; private boolean _restoreCredit; - public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) + public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, + final ConsumerImpl consumer, + ServerSession session, + MessageInstance entry, + boolean restoreCredit) { super(); _sub = sub; _entry = entry; _session = session; _restoreCredit = restoreCredit; + _consumer = consumer; if(restoreCredit) { _messageSize = entry.getMessage().getSize(); @@ -51,7 +58,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { _sub.getCreditManager().restoreCredit(1l, _messageSize); } - if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition()) + if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition()) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 5adeba66b1..14082091f9 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,6 +25,7 @@ import java.security.AccessControlException; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -198,15 +199,44 @@ public class ServerSessionDelegate extends SessionDelegate else { String queueName = method.getQueue(); - VirtualHostImpl vhost = getVirtualHost(session); + VirtualHostImpl<?,?,?> vhost = getVirtualHost(session); + final Collection<MessageSource> sources = new HashSet<>(); final MessageSource queue = vhost.getMessageSource(queueName); + if(queue != null) + { + sources.add(queue); + } + else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && method.getArguments() != null + && method.getArguments().get("x-multiqueue") instanceof Collection) + { + for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if(sourceName.length() != 0) + { + MessageSource source = vhost.getMessageSource(sourceName); + if(source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = method.getArguments().get("x-multiqueue").toString(); + } - if(queue == null) + if(sources.isEmpty()) { - exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(!queue.verifySessionAccess((ServerSession)session)) + else if(!verifySessionAccess((ServerSession) session, sources)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -250,12 +280,15 @@ public class ServerSessionDelegate extends SessionDelegate { options.add(ConsumerImpl.Option.EXCLUSIVE); } - ((ServerSession)session).register( - queue.addConsumer(target, - filterManager, - MessageTransferMessage.class, - destination, - options)); + for(MessageSource source : sources) + { + ((ServerSession) session).register( + source.addConsumer(target, + filterManager, + MessageTransferMessage.class, + destination, + options)); + } } catch (AMQQueue.ExistingExclusiveConsumer existing) { @@ -278,6 +311,23 @@ public class ServerSessionDelegate extends SessionDelegate } } + protected boolean verifySessionAccess(final ServerSession session, final Collection<MessageSource> queues) + { + for(MessageSource source : queues) + { + if(!verifySessionAccess(session, source)) + { + return false; + } + } + return true; + } + + protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue) + { + return queue.verifySessionAccess(session); + } + @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { @@ -820,17 +870,15 @@ public class ServerSessionDelegate extends SessionDelegate return destination; } - private VirtualHostImpl getVirtualHost(Session session) + private VirtualHostImpl<?,?,?> getVirtualHost(Session session) { ServerConnection conn = getServerConnection(session); - VirtualHostImpl vhost = conn.getVirtualHost(); - return vhost; + return conn.getVirtualHost(); } private ServerConnection getServerConnection(Session session) { - ServerConnection conn = (ServerConnection) session.getConnection(); - return conn; + return (ServerConnection) session.getConnection(); } @Override @@ -1238,7 +1286,7 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } - else if (!queue.verifySessionAccess((ServerSession)session)) + else if (!verifySessionAccess((ServerSession) session, queue)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1296,7 +1344,7 @@ public class ServerSessionDelegate extends SessionDelegate catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - if (!queue.verifySessionAccess((ServerSession)session)) + if (!verifySessionAccess((ServerSession) session, queue)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1357,7 +1405,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(!queue.verifySessionAccess((ServerSession)session)) + if(!verifySessionAccess((ServerSession) session, queue)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 49bc26149e..d1ec2e139e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -61,6 +61,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -543,10 +544,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } - public ConsumerImpl getSubscription(AMQShortString tag) + public ConsumerTarget getSubscription(AMQShortString tag) { - final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); - return target == null ? null : target.getConsumer(); + return _tag2SubscriptionTargetMap.get(tag); } /** @@ -555,7 +555,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * * * @param tag the tag chosen by the client (if null, server will generate one) - * @param source the queue to subscribe to + * @param sources the queues to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * @@ -564,7 +564,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * * @throws org.apache.qpid.AMQException if something goes wrong */ - public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, + public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, @@ -632,18 +632,21 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } }); } - ConsumerImpl sub = - source.addConsumer(target, - filterManager, - AMQMessage.class, - AMQShortString.toString(tag), - options); - if(sub instanceof Consumer<?>) + for(MessageSource source : sources) { - final Consumer<?> modelConsumer = (Consumer<?>) sub; - consumerAdded(modelConsumer); - modelConsumer.addChangeListener(_consumerClosedListener); - _consumers.add(modelConsumer); + ConsumerImpl sub = + source.addConsumer(target, + filterManager, + AMQMessage.class, + AMQShortString.toString(tag), + options); + if (sub instanceof Consumer<?>) + { + final Consumer<?> modelConsumer = (Consumer<?>) sub; + consumerAdded(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + _consumers.add(modelConsumer); + } } } catch (AccessControlException e) @@ -683,13 +686,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - ConsumerImpl sub = target == null ? null : target.getConsumer(); - if (sub != null) + Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers(); + if (subs != null) { - sub.close(); - if(sub instanceof Consumer<?>) + for(ConsumerImpl sub : subs) { - _consumers.remove(sub); + sub.close(); + if (sub instanceof Consumer<?>) + { + _consumers.remove(sub); + } } return true; } @@ -763,11 +769,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - ConsumerImpl sub = me.getValue().getConsumer(); + Collection<ConsumerImpl> subs = me.getValue().getConsumers(); - if(sub != null) + if(subs != null) { - sub.close(); + for(ConsumerImpl sub : subs) + { + sub.close(); + } } } @@ -1032,7 +1041,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // may need to deliver queued messages for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { - s.getConsumer().externalStateChange(); + for(ConsumerImpl sub : s.getConsumers()) + { + sub.externalStateChange(); + } } } @@ -1050,11 +1062,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { try { - s.getConsumer().getSendLock(); + s.getSendLock(); } finally { - s.getConsumer().releaseSendLock(); + s.releaseSendLock(); } } } @@ -1133,8 +1145,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // ensure all subscriptions have seen the change to the channel state for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getConsumer().getSendLock(); - sub.getConsumer().releaseSendLock(); + sub.getSendLock(); + sub.releaseSendLock(); } try @@ -1169,9 +1181,12 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if(requiresSuspend) { _suspended.set(false); - for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) + for(ConsumerTarget_0_8 target : _tag2SubscriptionTargetMap.values()) { - sub.getConsumer().externalStateChange(); + for(ConsumerImpl sub : target.getConsumers()) + { + sub.externalStateChange(); + } } } @@ -1179,7 +1194,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public String toString() { - return "["+_session.toString()+":"+_channelId+"]"; + return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index d5eed242e7..acb74c99e6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -73,7 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private ConsumerImpl _consumer; + private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -93,6 +95,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } + public List<ConsumerImpl> getConsumers() + { + return _consumers; + } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -111,12 +118,14 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * thread safe. * * + * + * @param consumer * @param entry * @param batch * @throws org.apache.qpid.AMQException */ @Override - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -124,7 +133,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -173,11 +182,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * + * @param consumer * @param entry The message to send * @param batch */ @Override - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -200,7 +210,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - size = sendToClient(message, props, deliveryTag); + size = sendToClient(consumer, message, props, deliveryTag); } ref.release(); @@ -287,11 +297,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * + * @param consumer * @param entry The message to send * @param batch */ @Override - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { @@ -301,9 +312,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen long deliveryTag = getChannel().getNextDeliveryTag(); addUnacknowledgedMessage(entry); - recordMessageDelivery(entry, deliveryTag); + recordMessageDelivery(consumer, entry, deliveryTag); entry.addStateChangeListener(getReleasedStateChangeListener()); - long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); return size; } @@ -366,20 +377,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - public ConsumerImpl getConsumer() - { - return _consumer; - } - @Override public void consumerRemoved(final ConsumerImpl sub) { + _consumers.remove(sub); + if(_consumers.isEmpty()) + { + close(); + } } @Override public void consumerAdded(final ConsumerImpl sub) { - _consumer = sub; + _consumers.add( sub ); } public AMQSessionModel getSessionModel() @@ -426,12 +437,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - final ConsumerImpl consumer = getConsumer(); + getSendLock(); - if(consumer != null) - { - consumer.getSendLock(); - } try { while(!closed && state != State.CLOSED) @@ -447,10 +454,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } finally { - if(consumer != null) - { - consumer.releaseSendLock(); - } + releaseSendLock(); } } @@ -493,7 +497,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen if(!updateState(State.SUSPENDED, State.ACTIVE)) { // this is a hack to get round the issue of increasing bytes credit - getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + notifyCurrentState(); } } else @@ -502,16 +506,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message, + final InstanceProperties props, + final long deliveryTag) { - return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); + return _deliveryMethod.deliverToClient(consumer, message, props, deliveryTag); } - protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag) + protected void recordMessageDelivery(final ConsumerImpl consumer, + final MessageInstance entry, + final long deliveryTag) { - _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag); + _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index ff0e4199cf..c1e3d850ef 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.Collection; +import java.util.HashSet; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -32,13 +36,11 @@ import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody> { private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class); @@ -59,7 +61,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); AMQChannel channel = protocolConnection.getChannel(channelId); - VirtualHostImpl vHost = protocolConnection.getVirtualHost(); + VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost(); if (channel == null) { @@ -68,25 +70,55 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { channel.sync(); + String queueName = body.getQueue() == null ? null : body.getQueue().asString(); if (_logger.isDebugEnabled()) { - _logger.debug("BasicConsume: from '" + body.getQueue() + + _logger.debug("BasicConsume: from '" + queueName + "' for:" + body.getConsumerTag() + " nowait:" + body.getNowait() + " args:" + body.getArguments()); } - MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); + MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); + final Collection<MessageSource> sources = new HashSet<>(); + if(queue != null) + { + sources.add(queue); + } + else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && body.getArguments() != null + && body.getArguments().get("x-multiqueue") instanceof Collection) + { + for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if(sourceName.length() != 0) + { + MessageSource source = vHost.getMessageSource(sourceName); + if(source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = body.getArguments().get("x-multiqueue").toString(); + } - if (queue == null) + if (sources.isEmpty()) { if (_logger.isDebugEnabled()) { - _logger.debug("No queue for '" + body.getQueue() + "'"); + _logger.debug("No queue for '" + queueName + "'"); } - if (body.getQueue() != null) + if (queueName != null) { - String msg = "No such queue, '" + body.getQueue() + "'"; + String msg = "No such queue, '" + queueName + "'"; throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else @@ -114,7 +146,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, - queue, + sources, !body.getNoAck(), body.getArguments(), body.getExclusive(), diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 58989bbef9..235d263798 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.Collections; import java.util.List; import org.apache.qpid.exchange.ExchangeDefaults; @@ -138,7 +139,9 @@ public class AcknowledgeTest extends QpidTestCase assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); //Subscribe to the queue - AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false); + AMQShortString subscriber = _channel.consumeFromSource(null, + Collections.singleton(_queue), + true, null, true, false); getQueue().deliverAsync(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index cf065de38a..a4402efc84 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.Collections; import java.util.List; import org.apache.qpid.common.AMQPFilterTypes; @@ -143,6 +144,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.consumeFromSource(null, queue, true, filters, true, false); + return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 918a890af5..c5d9a5e35d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -93,7 +93,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget boolean closed = false; State state = getState(); - getConsumer().getSendLock(); + getSendLock(); try { while(!closed && state != State.CLOSED) @@ -108,11 +108,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } finally { - getConsumer().releaseSendLock(); + releaseSendLock(); } } - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // TODO long size = entry.getMessage().getSize(); @@ -515,7 +515,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget @Override public void consumerRemoved(final ConsumerImpl sub) { - + close(); } @Override diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 34f08615ad..8a3ef65979 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -1065,6 +1065,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public ConsumerImpl getAcquiringConsumer() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index a3b1f932ac..3f873a24ff 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.management.amqp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; @@ -27,19 +31,12 @@ import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.StateChangeListener; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - class ManagementNodeConsumer implements ConsumerImpl { private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); private final ManagementNode _managementNode; private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>()); private final ConsumerTarget _target; - private final Lock _stateChangeLock = new ReentrantLock(); private final String _name; private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener(); @@ -49,7 +46,7 @@ class ManagementNodeConsumer implements ConsumerImpl _name = consumerName; _managementNode = managementNode; _target = target; - target.setStateListener(_targetChangeListener); + target.addStateListener(_targetChangeListener); } @Override @@ -133,19 +130,19 @@ class ManagementNodeConsumer implements ConsumerImpl @Override public boolean trySendLock() { - return _stateChangeLock.tryLock(); + return _target.trySendLock(); } @Override public void getSendLock() { - _stateChangeLock.lock(); + _target.getSendLock(); } @Override public void releaseSendLock() { - _stateChangeLock.unlock(); + _target.releaseSendLock(); } @@ -174,13 +171,13 @@ class ManagementNodeConsumer implements ConsumerImpl void send(final InternalMessage response) { - getSendLock(); + _target.getSendLock(); try { final ManagementResponse responseEntry = new ManagementResponse(this, response); if(_queue.isEmpty() && _target.allocateCredit(response)) { - _target.send(responseEntry,false); + _target.send(this, responseEntry, false); } else { @@ -189,7 +186,7 @@ class ManagementNodeConsumer implements ConsumerImpl } finally { - releaseSendLock(); + _target.releaseSendLock(); } } @@ -209,7 +206,7 @@ class ManagementNodeConsumer implements ConsumerImpl private void deliverMessages() { - getSendLock(); + _target.getSendLock(); try { while(!_queue.isEmpty()) @@ -219,7 +216,7 @@ class ManagementNodeConsumer implements ConsumerImpl if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage())) { _queue.remove(0); - _target.send(managementResponse,false); + _target.send(this, managementResponse, false); } else { @@ -229,7 +226,7 @@ class ManagementNodeConsumer implements ConsumerImpl } finally { - releaseSendLock(); + _target.releaseSendLock(); } } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 03e7eab61b..501ce40db7 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -84,6 +84,12 @@ class ManagementResponse implements MessageInstance } @Override + public ConsumerImpl getAcquiringConsumer() + { + return _consumer; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); |
