diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
| commit | 1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch) | |
| tree | f304353182e02369661b8ecfdde357a288b183e3 | |
| parent | 8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff) | |
| download | qpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz | |
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
31 files changed, 516 insertions, 252 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index aa721e598a..192164ca6b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -20,16 +20,24 @@ */ package org.apache.qpid.server.consumer; -import org.apache.qpid.server.util.StateChangeListener; - +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget { private final AtomicReference<State> _state; - private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener = - new AtomicReference<StateChangeListener<ConsumerTarget, State>>(); + + private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new + CopyOnWriteArraySet<>(); + + private final Lock _stateChangeLock = new ReentrantLock(); + protected AbstractConsumerTarget(final State initialState) { @@ -46,8 +54,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget { if(_state.compareAndSet(from, to)) { - StateChangeListener<ConsumerTarget, State> listener = _stateListener.get(); - if(listener != null) + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) { listener.stateChanged(this, from, to); } @@ -59,15 +66,38 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } } + public final void notifyCurrentState() + { + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) + { + State state = getState(); + listener.stateChanged(this, state, state); + } + } + public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener) + { + _stateChangeListeners.add(listener); + } + + @Override + public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener) + { + _stateChangeListeners.remove(listener); + } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } - public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener) + public final void getSendLock() { - _stateListener.set(listener); + _stateChangeLock.lock(); } - public final StateChangeListener<ConsumerTarget, State> getStateListener() + public final void releaseSendLock() { - return _stateListener.get(); + _stateChangeLock.unlock(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index b7be1bfd9b..5aef922da5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -31,6 +31,8 @@ public interface ConsumerTarget void acquisitionRemoved(MessageInstance node); + void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -42,7 +44,7 @@ public interface ConsumerTarget void consumerRemoved(ConsumerImpl sub); - void setStateListener(StateChangeListener<ConsumerTarget, State> listener); + void addStateListener(StateChangeListener<ConsumerTarget, State> listener); long getUnacknowledgedBytes(); @@ -50,7 +52,7 @@ public interface ConsumerTarget AMQSessionModel getSessionModel(); - long send(MessageInstance entry, boolean batch); + long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); void flushBatched(); @@ -65,4 +67,11 @@ public interface ConsumerTarget boolean isSuspended(); boolean close(); + + boolean trySendLock(); + + void getSendLock(); + + void releaseSendLock(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 1bf451948d..d3518f428b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -79,6 +79,8 @@ public interface MessageInstance Filterable asFilterable(); + ConsumerImpl getAcquiringConsumer(); + public static enum State { AVAILABLE, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 71b7636159..02cd7ff56f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.model.Consumer; @@ -52,4 +53,6 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, MessageInstance.ConsumerAcquiredState<X> getOwningState(); QueueContext getQueueContext(); + + ConsumerTarget getTarget(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 4044c938db..60bad7bf1c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -65,7 +63,6 @@ class QueueConsumerImpl private final AtomicBoolean _targetClosed = new AtomicBoolean(false); private final AtomicBoolean _closed = new AtomicBoolean(false); private final long _consumerNumber; - private final Lock _stateChangeLock = new ReentrantLock(); private final long _createTime = System.currentTimeMillis(); private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this); private final boolean _acquires; @@ -90,6 +87,8 @@ class QueueConsumerImpl private final ConsumerTarget _target; private final SubFlushRunner _runner = new SubFlushRunner(this); + private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> + _listener; private volatile QueueContext _queueContext; private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>() { @@ -134,17 +133,17 @@ class QueueConsumerImpl setupLogging(); - _target.setStateListener( - new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() - { - @Override - public void stateChanged(final ConsumerTarget object, - final ConsumerTarget.State oldState, - final ConsumerTarget.State newState) - { - targetStateChanged(oldState, newState); - } - }); + _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() + { + @Override + public void stateChanged(final ConsumerTarget object, + final ConsumerTarget.State oldState, + final ConsumerTarget.State newState) + { + targetStateChanged(oldState, newState); + } + }; + _target.addStateListener(_listener); } private static Map<String, Object> createAttributeMap(String name, FilterManager filters, EnumSet<Option> optionSet) @@ -202,6 +201,7 @@ class QueueConsumerImpl } } + @Override public ConsumerTarget getTarget() { return _target; @@ -248,17 +248,17 @@ class QueueConsumerImpl { if(_closed.compareAndSet(false,true)) { - getSendLock(); + _target.getSendLock(); try { - _target.close(); _target.consumerRemoved(this); + _target.removeStateChangeListener(_listener); _queue.unregisterConsumer(this); deleted(); } finally { - releaseSendLock(); + _target.releaseSendLock(); } } @@ -420,17 +420,17 @@ class QueueConsumerImpl public final boolean trySendLock() { - return _stateChangeLock.tryLock(); + return getTarget().trySendLock(); } public final void getSendLock() { - _stateChangeLock.lock(); + getTarget().getSendLock(); } public final void releaseSendLock() { - _stateChangeLock.unlock(); + getTarget().releaseSendLock(); } public final long getCreateTime() @@ -471,7 +471,7 @@ class QueueConsumerImpl public final void send(final QueueEntry entry, final boolean batch) { _deliveredCount.incrementAndGet(); - long size = _target.send(entry, batch); + long size = _target.send(this, entry, batch); _deliveredBytes.addAndGet(size); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6c541d78ef..a0f2dc798d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -247,6 +247,26 @@ public abstract class QueueEntryImpl implements QueueEntry } @Override + public ConsumerImpl getAcquiringConsumer() + { + ConsumerImpl consumer; + EntryState state = _state; + if(state instanceof ConsumerAcquiredState) + { + consumer = ((ConsumerAcquiredState)state).getConsumer(); + } + else if(state instanceof LockedAcquiredState) + { + consumer = ((LockedAcquiredState)state).getConsumer(); + } + else + { + consumer = null; + } + return consumer; + } + + @Override public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 30f6fa66c6..e784126d24 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -21,18 +21,18 @@ package org.apache.qpid.server.queue; import java.security.PrivilegedAction; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.security.auth.Subject; + import org.apache.log4j.Logger; + import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; -import javax.security.auth.Subject; - /** * QueueRunners are Runnables used to process a queue when requiring * asynchronous message delivery to consumers, which is necessary @@ -111,7 +111,7 @@ public class QueueRunner implements Runnable public String toString() { - return "QueueRunner-" + _queue.getLogSubject(); + return "QueueRunner-" + _queue.getLogSubject().toLogString(); } public void execute() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 910727ce42..750db4aba5 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -21,18 +21,18 @@ package org.apache.qpid.server.queue; +import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.security.auth.Subject; + import org.apache.log4j.Logger; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; -import javax.security.auth.Subject; -import java.security.PrivilegedAction; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - class SubFlushRunner implements Runnable { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index ad33ecadcf..1482576748 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -167,7 +167,7 @@ public class MockConsumer implements ConsumerTarget { } - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { long size = entry.getMessage().getSize(); if (messages.contains(entry)) @@ -202,7 +202,7 @@ public class MockConsumer implements ConsumerTarget @Override public void consumerRemoved(final ConsumerImpl sub) { - + close(); } public void setState(State state) @@ -216,11 +216,20 @@ public class MockConsumer implements ConsumerTarget } @Override - public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener) + public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener) { _listener = listener; } + @Override + public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener) + { + if(_listener == listener) + { + _listener = null; + } + } + public ArrayList<MessageInstance> getMessages() { return messages; @@ -242,6 +251,23 @@ public class MockConsumer implements ConsumerTarget _isActive = isActive; } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public final void getSendLock() + { + _stateChangeLock.lock(); + } + + public final void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + private static class MockSessionModel implements AMQSessionModel { private final UUID _id = UUID.randomUUID(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 37c4eeb127..08c0de95d5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -60,6 +60,12 @@ public class MockMessageInstance implements MessageInstance } @Override + public ConsumerImpl getAcquiringConsumer() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index f13886d2b2..7fc36f39bb 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -205,12 +205,13 @@ public class StandardQueueTest extends AbstractQueueTestBase { /** * Send a message and decrement latch + * @param consumer * @param entry * @param batch */ - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { - long size = super.send(entry, batch); + long size = super.send(consumer, entry, batch); latch.countDown(); return size; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index db7ee54cb2..088c80c219 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -22,7 +22,9 @@ package org.apache.qpid.server.protocol.v0_10; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -37,6 +39,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueConsumer; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -77,7 +80,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final Map<String, Object> _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private ConsumerImpl _consumer; + private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); public ConsumerTarget_0_10(ServerSession session, @@ -101,11 +104,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public ConsumerImpl getConsumer() - { - return _consumer; - } - public boolean isSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension @@ -116,11 +114,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - final ConsumerImpl consumer = getConsumer(); - if(consumer != null) - { - consumer.getSendLock(); - } + getSendLock(); try { while(!closed && state != State.CLOSED) @@ -135,10 +129,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC } finally { - if(consumer != null) - { - consumer.releaseSendLock(); - } + releaseSendLock(); } return closed; @@ -153,7 +144,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(!updateState(State.SUSPENDED, State.ACTIVE)) { // this is a hack to get round the issue of increasing bytes credit - getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + notifyCurrentState(); } } else @@ -200,7 +191,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -303,12 +294,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED) - : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body); + xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW)); } else if(_flowMode == MessageFlowMode.WINDOW) { @@ -325,11 +316,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _postIdSettingAction.setXfr(xfr); if(_acceptMode == MessageAcceptMode.EXPLICIT) { - _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); + _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer)); } else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) { - _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); + _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer)); } else { @@ -401,12 +392,23 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { entry.setRedelivered(); entry.routeToAlternate(null, null); - if(entry.isAcquiredBy(getConsumer())) + if(isAcquiredByConsumer(entry)) { entry.delete(); } } + private boolean isAcquiredByConsumer(final MessageInstance entry) + { + ConsumerImpl acquiringConsumer = entry.getAcquiringConsumer(); + if(acquiringConsumer instanceof QueueConsumer) + { + return ((QueueConsumer)acquiringConsumer).getTarget() == this; + } + + return false; + } + void release(final MessageInstance entry, final boolean setRedelivered) { if (setRedelivered) @@ -503,7 +505,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { try { - getConsumer().getSendLock(); + getSendLock(); updateState(State.ACTIVE, State.SUSPENDED); _stopped.set(true); @@ -512,7 +514,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC } finally { - getConsumer().releaseSendLock(); + releaseSendLock(); } } @@ -572,7 +574,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC public boolean deleteAcquired(MessageInstance entry) { - if(entry.isAcquiredBy(getConsumer())) + if(isAcquiredByConsumer(entry)) { acquisitionRemoved(entry); entry.delete(); @@ -594,7 +596,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC public void flush() { flushCreditState(true); - getConsumer().flush(); + for(ConsumerImpl consumer : _consumers) + { + consumer.flush(); + } stop(); } @@ -626,12 +631,17 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override public void consumerAdded(final ConsumerImpl sub) { - _consumer = sub; + _consumers.add(sub); } @Override public void consumerRemoved(final ConsumerImpl sub) { + _consumers.remove(sub); + if(_consumers.isEmpty()) + { + close(); + } } public long getUnacknowledgedBytes() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 95dba11ea0..66f8c97063 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -32,16 +33,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final MessageInstance _entry; private final ConsumerTarget_0_10 _target; + private final ConsumerImpl _consumer; - public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) + public ExplicitAcceptDispositionChangeListener(MessageInstance entry, + ConsumerTarget_0_10 target, + final ConsumerImpl consumer) { _entry = entry; _target = target; + _consumer = consumer; } public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition()) + if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition()) { _target.getSessionModel().acknowledge(_target, _entry); } @@ -54,7 +59,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_consumer)) { _target.release(_entry, setRedelivered); } @@ -66,7 +71,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_consumer)) { _target.reject(_entry); } @@ -79,7 +84,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - return _entry.acquire(_target.getConsumer()); + return _entry.acquire(_consumer); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index c459364dbb..5467a57fa1 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener @@ -30,12 +31,16 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private final MessageInstance _entry; - private ConsumerTarget_0_10 _target; + private final ConsumerTarget_0_10 _target; + private final ConsumerImpl _consumer; - public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) + public ImplicitAcceptDispositionChangeListener(MessageInstance entry, + ConsumerTarget_0_10 target, + final ConsumerImpl consumer) { _entry = entry; _target = target; + _consumer = consumer; } public void onAccept() @@ -45,7 +50,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_entry.isAcquiredBy(_target.getConsumer())) + if(_entry.isAcquiredBy(_consumer)) { _target.release(_entry, setRedelivered); } @@ -57,7 +62,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_entry.isAcquiredBy(_target.getConsumer())) + if(_entry.isAcquiredBy(_consumer)) { _target.reject(_entry); } @@ -70,7 +75,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - boolean acquired = _entry.acquire(_target.getConsumer()); + boolean acquired = _entry.acquire(_consumer); if(acquired) { _target.recordUnacknowledged(_entry); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index 7917b7989a..d581d146a8 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.transport.Method; @@ -29,16 +30,22 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; + private final ConsumerImpl _consumer; private long _messageSize; private boolean _restoreCredit; - public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) + public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, + final ConsumerImpl consumer, + ServerSession session, + MessageInstance entry, + boolean restoreCredit) { super(); _sub = sub; _entry = entry; _session = session; _restoreCredit = restoreCredit; + _consumer = consumer; if(restoreCredit) { _messageSize = entry.getMessage().getSize(); @@ -51,7 +58,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { _sub.getCreditManager().restoreCredit(1l, _messageSize); } - if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition()) + if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition()) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 5adeba66b1..14082091f9 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,6 +25,7 @@ import java.security.AccessControlException; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -198,15 +199,44 @@ public class ServerSessionDelegate extends SessionDelegate else { String queueName = method.getQueue(); - VirtualHostImpl vhost = getVirtualHost(session); + VirtualHostImpl<?,?,?> vhost = getVirtualHost(session); + final Collection<MessageSource> sources = new HashSet<>(); final MessageSource queue = vhost.getMessageSource(queueName); + if(queue != null) + { + sources.add(queue); + } + else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && method.getArguments() != null + && method.getArguments().get("x-multiqueue") instanceof Collection) + { + for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if(sourceName.length() != 0) + { + MessageSource source = vhost.getMessageSource(sourceName); + if(source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = method.getArguments().get("x-multiqueue").toString(); + } - if(queue == null) + if(sources.isEmpty()) { - exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(!queue.verifySessionAccess((ServerSession)session)) + else if(!verifySessionAccess((ServerSession) session, sources)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } @@ -250,12 +280,15 @@ public class ServerSessionDelegate extends SessionDelegate { options.add(ConsumerImpl.Option.EXCLUSIVE); } - ((ServerSession)session).register( - queue.addConsumer(target, - filterManager, - MessageTransferMessage.class, - destination, - options)); + for(MessageSource source : sources) + { + ((ServerSession) session).register( + source.addConsumer(target, + filterManager, + MessageTransferMessage.class, + destination, + options)); + } } catch (AMQQueue.ExistingExclusiveConsumer existing) { @@ -278,6 +311,23 @@ public class ServerSessionDelegate extends SessionDelegate } } + protected boolean verifySessionAccess(final ServerSession session, final Collection<MessageSource> queues) + { + for(MessageSource source : queues) + { + if(!verifySessionAccess(session, source)) + { + return false; + } + } + return true; + } + + protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue) + { + return queue.verifySessionAccess(session); + } + @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { @@ -820,17 +870,15 @@ public class ServerSessionDelegate extends SessionDelegate return destination; } - private VirtualHostImpl getVirtualHost(Session session) + private VirtualHostImpl<?,?,?> getVirtualHost(Session session) { ServerConnection conn = getServerConnection(session); - VirtualHostImpl vhost = conn.getVirtualHost(); - return vhost; + return conn.getVirtualHost(); } private ServerConnection getServerConnection(Session session) { - ServerConnection conn = (ServerConnection) session.getConnection(); - return conn; + return (ServerConnection) session.getConnection(); } @Override @@ -1238,7 +1286,7 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } - else if (!queue.verifySessionAccess((ServerSession)session)) + else if (!verifySessionAccess((ServerSession) session, queue)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1296,7 +1344,7 @@ public class ServerSessionDelegate extends SessionDelegate catch(QueueExistsException qe) { queue = qe.getExistingQueue(); - if (!queue.verifySessionAccess((ServerSession)session)) + if (!verifySessionAccess((ServerSession) session, queue)) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " @@ -1357,7 +1405,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if(!queue.verifySessionAccess((ServerSession)session)) + if(!verifySessionAccess((ServerSession) session, queue)) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 49bc26149e..d1ec2e139e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -61,6 +61,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -543,10 +544,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } - public ConsumerImpl getSubscription(AMQShortString tag) + public ConsumerTarget getSubscription(AMQShortString tag) { - final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); - return target == null ? null : target.getConsumer(); + return _tag2SubscriptionTargetMap.get(tag); } /** @@ -555,7 +555,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * * * @param tag the tag chosen by the client (if null, server will generate one) - * @param source the queue to subscribe to + * @param sources the queues to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * @@ -564,7 +564,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * * @throws org.apache.qpid.AMQException if something goes wrong */ - public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, + public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, @@ -632,18 +632,21 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } }); } - ConsumerImpl sub = - source.addConsumer(target, - filterManager, - AMQMessage.class, - AMQShortString.toString(tag), - options); - if(sub instanceof Consumer<?>) + for(MessageSource source : sources) { - final Consumer<?> modelConsumer = (Consumer<?>) sub; - consumerAdded(modelConsumer); - modelConsumer.addChangeListener(_consumerClosedListener); - _consumers.add(modelConsumer); + ConsumerImpl sub = + source.addConsumer(target, + filterManager, + AMQMessage.class, + AMQShortString.toString(tag), + options); + if (sub instanceof Consumer<?>) + { + final Consumer<?> modelConsumer = (Consumer<?>) sub; + consumerAdded(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + _consumers.add(modelConsumer); + } } } catch (AccessControlException e) @@ -683,13 +686,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - ConsumerImpl sub = target == null ? null : target.getConsumer(); - if (sub != null) + Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers(); + if (subs != null) { - sub.close(); - if(sub instanceof Consumer<?>) + for(ConsumerImpl sub : subs) { - _consumers.remove(sub); + sub.close(); + if (sub instanceof Consumer<?>) + { + _consumers.remove(sub); + } } return true; } @@ -763,11 +769,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - ConsumerImpl sub = me.getValue().getConsumer(); + Collection<ConsumerImpl> subs = me.getValue().getConsumers(); - if(sub != null) + if(subs != null) { - sub.close(); + for(ConsumerImpl sub : subs) + { + sub.close(); + } } } @@ -1032,7 +1041,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // may need to deliver queued messages for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { - s.getConsumer().externalStateChange(); + for(ConsumerImpl sub : s.getConsumers()) + { + sub.externalStateChange(); + } } } @@ -1050,11 +1062,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { try { - s.getConsumer().getSendLock(); + s.getSendLock(); } finally { - s.getConsumer().releaseSendLock(); + s.releaseSendLock(); } } } @@ -1133,8 +1145,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>> // ensure all subscriptions have seen the change to the channel state for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getConsumer().getSendLock(); - sub.getConsumer().releaseSendLock(); + sub.getSendLock(); + sub.releaseSendLock(); } try @@ -1169,9 +1181,12 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if(requiresSuspend) { _suspended.set(false); - for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) + for(ConsumerTarget_0_8 target : _tag2SubscriptionTargetMap.values()) { - sub.getConsumer().externalStateChange(); + for(ConsumerImpl sub : target.getConsumers()) + { + sub.externalStateChange(); + } } } @@ -1179,7 +1194,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> public String toString() { - return "["+_session.toString()+":"+_channelId+"]"; + return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]"; } public void setDefaultQueue(AMQQueue queue) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index d5eed242e7..acb74c99e6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -73,7 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private ConsumerImpl _consumer; + private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>(); public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -93,6 +95,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } + public List<ConsumerImpl> getConsumers() + { + return _consumers; + } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -111,12 +118,14 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * thread safe. * * + * + * @param consumer * @param entry * @param batch * @throws org.apache.qpid.AMQException */ @Override - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -124,7 +133,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -173,11 +182,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * + * @param consumer * @param entry The message to send * @param batch */ @Override - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -200,7 +210,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - size = sendToClient(message, props, deliveryTag); + size = sendToClient(consumer, message, props, deliveryTag); } ref.release(); @@ -287,11 +297,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * This method can be called by each of the publisher threads. As a result all changes to the channel object must be * thread safe. * + * @param consumer * @param entry The message to send * @param batch */ @Override - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { @@ -301,9 +312,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen long deliveryTag = getChannel().getNextDeliveryTag(); addUnacknowledgedMessage(entry); - recordMessageDelivery(entry, deliveryTag); + recordMessageDelivery(consumer, entry, deliveryTag); entry.addStateChangeListener(getReleasedStateChangeListener()); - long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); return size; } @@ -366,20 +377,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - public ConsumerImpl getConsumer() - { - return _consumer; - } - @Override public void consumerRemoved(final ConsumerImpl sub) { + _consumers.remove(sub); + if(_consumers.isEmpty()) + { + close(); + } } @Override public void consumerAdded(final ConsumerImpl sub) { - _consumer = sub; + _consumers.add( sub ); } public AMQSessionModel getSessionModel() @@ -426,12 +437,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - final ConsumerImpl consumer = getConsumer(); + getSendLock(); - if(consumer != null) - { - consumer.getSendLock(); - } try { while(!closed && state != State.CLOSED) @@ -447,10 +454,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } finally { - if(consumer != null) - { - consumer.releaseSendLock(); - } + releaseSendLock(); } } @@ -493,7 +497,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen if(!updateState(State.SUSPENDED, State.ACTIVE)) { // this is a hack to get round the issue of increasing bytes credit - getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + notifyCurrentState(); } } else @@ -502,16 +506,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message, + final InstanceProperties props, + final long deliveryTag) { - return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); + return _deliveryMethod.deliverToClient(consumer, message, props, deliveryTag); } - protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag) + protected void recordMessageDelivery(final ConsumerImpl consumer, + final MessageInstance entry, + final long deliveryTag) { - _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag); + _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index ff0e4199cf..c1e3d850ef 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.Collection; +import java.util.HashSet; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -32,13 +36,11 @@ import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody> { private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class); @@ -59,7 +61,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); AMQChannel channel = protocolConnection.getChannel(channelId); - VirtualHostImpl vHost = protocolConnection.getVirtualHost(); + VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost(); if (channel == null) { @@ -68,25 +70,55 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { channel.sync(); + String queueName = body.getQueue() == null ? null : body.getQueue().asString(); if (_logger.isDebugEnabled()) { - _logger.debug("BasicConsume: from '" + body.getQueue() + + _logger.debug("BasicConsume: from '" + queueName + "' for:" + body.getConsumerTag() + " nowait:" + body.getNowait() + " args:" + body.getArguments()); } - MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); + MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); + final Collection<MessageSource> sources = new HashSet<>(); + if(queue != null) + { + sources.add(queue); + } + else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") + && body.getArguments() != null + && body.getArguments().get("x-multiqueue") instanceof Collection) + { + for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue")) + { + String sourceName = String.valueOf(object); + sourceName = sourceName.trim(); + if(sourceName.length() != 0) + { + MessageSource source = vHost.getMessageSource(sourceName); + if(source == null) + { + sources.clear(); + break; + } + else + { + sources.add(source); + } + } + } + queueName = body.getArguments().get("x-multiqueue").toString(); + } - if (queue == null) + if (sources.isEmpty()) { if (_logger.isDebugEnabled()) { - _logger.debug("No queue for '" + body.getQueue() + "'"); + _logger.debug("No queue for '" + queueName + "'"); } - if (body.getQueue() != null) + if (queueName != null) { - String msg = "No such queue, '" + body.getQueue() + "'"; + String msg = "No such queue, '" + queueName + "'"; throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } else @@ -114,7 +146,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, - queue, + sources, !body.getNoAck(), body.getArguments(), body.getExclusive(), diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 58989bbef9..235d263798 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.Collections; import java.util.List; import org.apache.qpid.exchange.ExchangeDefaults; @@ -138,7 +139,9 @@ public class AcknowledgeTest extends QpidTestCase assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); //Subscribe to the queue - AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false); + AMQShortString subscriber = _channel.consumeFromSource(null, + Collections.singleton(_queue), + true, null, true, false); getQueue().deliverAsync(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index cf065de38a..a4402efc84 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; +import java.util.Collections; import java.util.List; import org.apache.qpid.common.AMQPFilterTypes; @@ -143,6 +144,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.consumeFromSource(null, queue, true, filters, true, false); + return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 918a890af5..c5d9a5e35d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -93,7 +93,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget boolean closed = false; State state = getState(); - getConsumer().getSendLock(); + getSendLock(); try { while(!closed && state != State.CLOSED) @@ -108,11 +108,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } finally { - getConsumer().releaseSendLock(); + releaseSendLock(); } } - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // TODO long size = entry.getMessage().getSize(); @@ -515,7 +515,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget @Override public void consumerRemoved(final ConsumerImpl sub) { - + close(); } @Override diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 34f08615ad..8a3ef65979 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -1065,6 +1065,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public ConsumerImpl getAcquiringConsumer() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index a3b1f932ac..3f873a24ff 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.management.amqp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; @@ -27,19 +31,12 @@ import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.StateChangeListener; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - class ManagementNodeConsumer implements ConsumerImpl { private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); private final ManagementNode _managementNode; private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>()); private final ConsumerTarget _target; - private final Lock _stateChangeLock = new ReentrantLock(); private final String _name; private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener(); @@ -49,7 +46,7 @@ class ManagementNodeConsumer implements ConsumerImpl _name = consumerName; _managementNode = managementNode; _target = target; - target.setStateListener(_targetChangeListener); + target.addStateListener(_targetChangeListener); } @Override @@ -133,19 +130,19 @@ class ManagementNodeConsumer implements ConsumerImpl @Override public boolean trySendLock() { - return _stateChangeLock.tryLock(); + return _target.trySendLock(); } @Override public void getSendLock() { - _stateChangeLock.lock(); + _target.getSendLock(); } @Override public void releaseSendLock() { - _stateChangeLock.unlock(); + _target.releaseSendLock(); } @@ -174,13 +171,13 @@ class ManagementNodeConsumer implements ConsumerImpl void send(final InternalMessage response) { - getSendLock(); + _target.getSendLock(); try { final ManagementResponse responseEntry = new ManagementResponse(this, response); if(_queue.isEmpty() && _target.allocateCredit(response)) { - _target.send(responseEntry,false); + _target.send(this, responseEntry, false); } else { @@ -189,7 +186,7 @@ class ManagementNodeConsumer implements ConsumerImpl } finally { - releaseSendLock(); + _target.releaseSendLock(); } } @@ -209,7 +206,7 @@ class ManagementNodeConsumer implements ConsumerImpl private void deliverMessages() { - getSendLock(); + _target.getSendLock(); try { while(!_queue.isEmpty()) @@ -219,7 +216,7 @@ class ManagementNodeConsumer implements ConsumerImpl if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage())) { _queue.remove(0); - _target.send(managementResponse,false); + _target.send(this, managementResponse, false); } else { @@ -229,7 +226,7 @@ class ManagementNodeConsumer implements ConsumerImpl } finally { - releaseSendLock(); + _target.releaseSendLock(); } } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 03e7eab61b..501ce40db7 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -84,6 +84,12 @@ class ManagementResponse implements MessageInstance } @Override + public ConsumerImpl getAcquiringConsumer() + { + return _consumer; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0183c30276..9bdcb9e83f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -683,47 +683,48 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic int type = resolveAddressType(dest); - + boolean resolved = false; switch (type) { case AMQDestination.QUEUE_TYPE: - { + + setLegacyFieldsForQueueType(dest); if(createNode) { - setLegacyFieldsForQueueType(dest); handleQueueNodeCreation(dest,noLocal); - break; + resolved = true; } else if (isQueueExist(dest,assertNode)) { - setLegacyFieldsForQueueType(dest); - break; + resolved = true; } - } + break; case AMQDestination.TOPIC_TYPE: - { + + setLegacyFieldsForTopicType(dest); if(createNode) { - setLegacyFieldsForTopicType(dest); verifySubject(dest); handleExchangeNodeCreation(dest); - break; + resolved = true; } else if (isExchangeExist(dest,assertNode)) { - setLegacyFieldsForTopicType(dest); verifySubject(dest); - break; + resolved = true; } - } + break; default: throw new AMQException( "The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue"); } - dest.setAddressResolved(System.currentTimeMillis()); + if(resolved) + { + dest.setAddressResolved(System.currentTimeMillis()); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 68b7cf1f88..dc1f9a719e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -645,18 +645,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic Link link = destination.getLink(); if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) { - arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs()); + arguments.putAll(link.getSubscription().getArgs()); } boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + String queue = queueName == null ? destination.getAddressName() : queueName.toString(); getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), + (queue, String.valueOf(tag), acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + String consumerTag = (consumer).getConsumerTagString(); if (capacity == 0) { @@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } catch(SessionException e) { - if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED + || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND) { match = false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 0145d15111..b0615ea99f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -514,6 +514,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { queueName = preprocessAddressTopic(consumer, queueName); + AMQDestination destination = consumer.getDestination(); + + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) + { + arguments.putAll(link.getSubscription().getArgs()); + } + BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, new AMQShortString(String.valueOf(tag)), @@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), nowait, - consumer.getArguments()); + FieldTable.convertToFieldTable(arguments)); AMQFrame jmsConsume = body.generateFrame(getChannelId()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 99154e820f..91e2143c47 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -43,7 +43,8 @@ public class AddressHelper public static final String LINK = "link"; public static final String X_DECLARE = "x-declare"; public static final String X_BINDINGS = "x-bindings"; - public static final String X_SUBSCRIBE = "x-subscribes"; + public static final String X_SUBSCRIBES = "x-subscribes"; + public static final String X_SUBSCRIBE = "x-subscribe"; public static final String CREATE = "create"; public static final String ASSERT = "assert"; public static final String DELETE = "delete"; @@ -265,19 +266,32 @@ public class AddressHelper Map linkMap = (Map) _address.getOptions().get(LINK); - if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) - { - Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); - - if (x_subscribe.containsKey(ARGUMENTS)) + if (linkMap != null) + { + Map x_subscribe = null; + + if(linkMap.containsKey(X_SUBSCRIBE)) { - link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); + } + else if(linkMap.containsKey(X_SUBSCRIBES)) + { + // left in for backwards compatibility with old broken constant + x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES); + } + + if(x_subscribe != null) + { + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String, Object>) x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false; + + link.getSubscription().setExclusive(exclusive); } - - boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? - Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; - - link.getSubscription().setExclusive(exclusive); } link.setBindings(getBindings(linkMap)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index a614690f83..7e9cb3072a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination.Binding; public class Link -{ +{ public enum FilterType { SQL92, XQUERY, SUBJECT } public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 391498194b..6fef0cd553 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -96,25 +96,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; AMQDestination dest = new AMQAnyDestination(addr1); - final String queueErrorMessage = "The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains(queueErrorMessage)); - } + // pass + _connection = getConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + dest = new AMQAnyDestination(addr1); - try - { - prod = jmsSession.createProducer(dest); - } - catch(JMSException e) - { - assertTrue(e.getCause().getMessage().contains(queueErrorMessage) - || e.getCause().getCause().getMessage().contains(queueErrorMessage)); } assertFalse("Queue should not be created",( @@ -170,7 +162,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); + // pass } try @@ -179,8 +171,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage) - || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage)); + // pass + _connection = getConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + dest = new AMQAnyDestination(addr1); } assertFalse("Queue should not be created",( @@ -196,8 +190,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); + // pass + _connection = getConnection(); + jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + dest = new AMQAnyDestination(addr1); } + assertFalse("Queue should not be created",( (AMQSession)jmsSession).isQueueExist(dest, false)); @@ -610,7 +608,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Using the ADDR method // default case - queue = ssn.createQueue("ADDR:my-queue2"); + queue = ssn.createQueue("ADDR:my-queue2 ; { assert : sender }"); try { prod = ssn.createProducer(queue); @@ -618,9 +616,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(Exception e) { - String s = "The name 'my-queue2' supplied in the address " + - "doesn't resolve to an exchange or a queue"; - assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage())); + String s = "Assert failed for queue : my-queue2"; + assertTrue(e.getCause().getMessage().contains(s) || e.getCause().getCause().getMessage().contains(s)); } // explicit create case |
