summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
commit1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch)
treef304353182e02369661b8ecfdde357a288b183e3
parent8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff)
downloadqpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java50
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java8
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java32
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java66
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java15
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java15
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java84
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java79
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java64
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java54
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java8
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java31
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java37
31 files changed, 516 insertions, 252 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index aa721e598a..192164ca6b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -20,16 +20,24 @@
*/
package org.apache.qpid.server.consumer;
-import org.apache.qpid.server.util.StateChangeListener;
-
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
private final AtomicReference<State> _state;
- private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener =
- new AtomicReference<StateChangeListener<ConsumerTarget, State>>();
+
+ private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
+ CopyOnWriteArraySet<>();
+
+ private final Lock _stateChangeLock = new ReentrantLock();
+
protected AbstractConsumerTarget(final State initialState)
{
@@ -46,8 +54,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
{
if(_state.compareAndSet(from, to))
{
- StateChangeListener<ConsumerTarget, State> listener = _stateListener.get();
- if(listener != null)
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
{
listener.stateChanged(this, from, to);
}
@@ -59,15 +66,38 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
}
+ public final void notifyCurrentState()
+ {
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ State state = getState();
+ listener.stateChanged(this, state, state);
+ }
+ }
+ public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _stateChangeListeners.add(listener);
+ }
+
+ @Override
+ public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _stateChangeListeners.remove(listener);
+ }
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
- public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ public final void getSendLock()
{
- _stateListener.set(listener);
+ _stateChangeLock.lock();
}
- public final StateChangeListener<ConsumerTarget, State> getStateListener()
+ public final void releaseSendLock()
{
- return _stateListener.get();
+ _stateChangeLock.unlock();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index b7be1bfd9b..5aef922da5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -31,6 +31,8 @@ public interface ConsumerTarget
void acquisitionRemoved(MessageInstance node);
+ void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -42,7 +44,7 @@ public interface ConsumerTarget
void consumerRemoved(ConsumerImpl sub);
- void setStateListener(StateChangeListener<ConsumerTarget, State> listener);
+ void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
@@ -50,7 +52,7 @@ public interface ConsumerTarget
AMQSessionModel getSessionModel();
- long send(MessageInstance entry, boolean batch);
+ long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
void flushBatched();
@@ -65,4 +67,11 @@ public interface ConsumerTarget
boolean isSuspended();
boolean close();
+
+ boolean trySendLock();
+
+ void getSendLock();
+
+ void releaseSendLock();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 1bf451948d..d3518f428b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -79,6 +79,8 @@ public interface MessageInstance
Filterable asFilterable();
+ ConsumerImpl getAcquiringConsumer();
+
public static enum State
{
AVAILABLE,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 71b7636159..02cd7ff56f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
@@ -52,4 +53,6 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
MessageInstance.ConsumerAcquiredState<X> getOwningState();
QueueContext getQueueContext();
+
+ ConsumerTarget getTarget();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 4044c938db..60bad7bf1c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -31,8 +31,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -65,7 +63,6 @@ class QueueConsumerImpl
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
- private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
private final boolean _acquires;
@@ -90,6 +87,8 @@ class QueueConsumerImpl
private final ConsumerTarget _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
+ private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+ _listener;
private volatile QueueContext _queueContext;
private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
{
@@ -134,17 +133,17 @@ class QueueConsumerImpl
setupLogging();
- _target.setStateListener(
- new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- targetStateChanged(oldState, newState);
- }
- });
+ _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ targetStateChanged(oldState, newState);
+ }
+ };
+ _target.addStateListener(_listener);
}
private static Map<String, Object> createAttributeMap(String name, FilterManager filters, EnumSet<Option> optionSet)
@@ -202,6 +201,7 @@ class QueueConsumerImpl
}
}
+ @Override
public ConsumerTarget getTarget()
{
return _target;
@@ -248,17 +248,17 @@ class QueueConsumerImpl
{
if(_closed.compareAndSet(false,true))
{
- getSendLock();
+ _target.getSendLock();
try
{
- _target.close();
_target.consumerRemoved(this);
+ _target.removeStateChangeListener(_listener);
_queue.unregisterConsumer(this);
deleted();
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
@@ -420,17 +420,17 @@ class QueueConsumerImpl
public final boolean trySendLock()
{
- return _stateChangeLock.tryLock();
+ return getTarget().trySendLock();
}
public final void getSendLock()
{
- _stateChangeLock.lock();
+ getTarget().getSendLock();
}
public final void releaseSendLock()
{
- _stateChangeLock.unlock();
+ getTarget().releaseSendLock();
}
public final long getCreateTime()
@@ -471,7 +471,7 @@ class QueueConsumerImpl
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
- long size = _target.send(entry, batch);
+ long size = _target.send(this, entry, batch);
_deliveredBytes.addAndGet(size);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6c541d78ef..a0f2dc798d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -247,6 +247,26 @@ public abstract class QueueEntryImpl implements QueueEntry
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ ConsumerImpl consumer;
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ consumer = ((ConsumerAcquiredState)state).getConsumer();
+ }
+ else if(state instanceof LockedAcquiredState)
+ {
+ consumer = ((LockedAcquiredState)state).getConsumer();
+ }
+ else
+ {
+ consumer = null;
+ }
+ return consumer;
+ }
+
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 30f6fa66c6..e784126d24 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -21,18 +21,18 @@
package org.apache.qpid.server.queue;
import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to consumers, which is necessary
@@ -111,7 +111,7 @@ public class QueueRunner implements Runnable
public String toString()
{
- return "QueueRunner-" + _queue.getLogSubject();
+ return "QueueRunner-" + _queue.getLogSubject().toLogString();
}
public void execute()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 910727ce42..750db4aba5 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -21,18 +21,18 @@
package org.apache.qpid.server.queue;
+import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
class SubFlushRunner implements Runnable
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index ad33ecadcf..1482576748 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -167,7 +167,7 @@ public class MockConsumer implements ConsumerTarget
{
}
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
long size = entry.getMessage().getSize();
if (messages.contains(entry))
@@ -202,7 +202,7 @@ public class MockConsumer implements ConsumerTarget
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
-
+ close();
}
public void setState(State state)
@@ -216,11 +216,20 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
+ public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener)
{
_listener = listener;
}
+ @Override
+ public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ if(_listener == listener)
+ {
+ _listener = null;
+ }
+ }
+
public ArrayList<MessageInstance> getMessages()
{
return messages;
@@ -242,6 +251,23 @@ public class MockConsumer implements ConsumerTarget
_isActive = isActive;
}
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public final void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public final void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+
private static class MockSessionModel implements AMQSessionModel
{
private final UUID _id = UUID.randomUUID();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 37c4eeb127..08c0de95d5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -60,6 +60,12 @@ public class MockMessageInstance implements MessageInstance
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index f13886d2b2..7fc36f39bb 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -205,12 +205,13 @@ public class StandardQueueTest extends AbstractQueueTestBase
{
/**
* Send a message and decrement latch
+ * @param consumer
* @param entry
* @param batch
*/
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- long size = super.send(entry, batch);
+ long size = super.send(consumer, entry, batch);
latch.countDown();
return size;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index db7ee54cb2..088c80c219 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,7 +22,9 @@ package org.apache.qpid.server.protocol.v0_10;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +39,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -77,7 +80,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private ConsumerImpl _consumer;
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
public ConsumerTarget_0_10(ServerSession session,
@@ -101,11 +104,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public ConsumerImpl getConsumer()
- {
- return _consumer;
- }
-
public boolean isSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
@@ -116,11 +114,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
boolean closed = false;
State state = getState();
- final ConsumerImpl consumer = getConsumer();
- if(consumer != null)
- {
- consumer.getSendLock();
- }
+ getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -135,10 +129,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
}
finally
{
- if(consumer != null)
- {
- consumer.releaseSendLock();
- }
+ releaseSendLock();
}
return closed;
@@ -153,7 +144,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(!updateState(State.SUSPENDED, State.ACTIVE))
{
// this is a hack to get round the issue of increasing bytes credit
- getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ notifyCurrentState();
}
}
else
@@ -200,7 +191,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -303,12 +294,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body);
+ xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body);
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW));
}
else if(_flowMode == MessageFlowMode.WINDOW)
{
@@ -325,11 +316,11 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_postIdSettingAction.setXfr(xfr);
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else
{
@@ -401,12 +392,23 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
entry.setRedelivered();
entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(getConsumer()))
+ if(isAcquiredByConsumer(entry))
{
entry.delete();
}
}
+ private boolean isAcquiredByConsumer(final MessageInstance entry)
+ {
+ ConsumerImpl acquiringConsumer = entry.getAcquiringConsumer();
+ if(acquiringConsumer instanceof QueueConsumer)
+ {
+ return ((QueueConsumer)acquiringConsumer).getTarget() == this;
+ }
+
+ return false;
+ }
+
void release(final MessageInstance entry, final boolean setRedelivered)
{
if (setRedelivered)
@@ -503,7 +505,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
try
{
- getConsumer().getSendLock();
+ getSendLock();
updateState(State.ACTIVE, State.SUSPENDED);
_stopped.set(true);
@@ -512,7 +514,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
}
finally
{
- getConsumer().releaseSendLock();
+ releaseSendLock();
}
}
@@ -572,7 +574,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
public boolean deleteAcquired(MessageInstance entry)
{
- if(entry.isAcquiredBy(getConsumer()))
+ if(isAcquiredByConsumer(entry))
{
acquisitionRemoved(entry);
entry.delete();
@@ -594,7 +596,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
public void flush()
{
flushCreditState(true);
- getConsumer().flush();
+ for(ConsumerImpl consumer : _consumers)
+ {
+ consumer.flush();
+ }
stop();
}
@@ -626,12 +631,17 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
@Override
public void consumerAdded(final ConsumerImpl sub)
{
- _consumer = sub;
+ _consumers.add(sub);
}
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
public long getUnacknowledgedBytes()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 95dba11ea0..66f8c97063 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -32,16 +33,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final MessageInstance _entry;
private final ConsumerTarget_0_10 _target;
+ private final ConsumerImpl _consumer;
- public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final ConsumerImpl consumer)
{
_entry = entry;
_target = target;
+ _consumer = consumer;
}
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
+ if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
@@ -54,7 +59,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_consumer))
{
_target.release(_entry, setRedelivered);
}
@@ -66,7 +71,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_consumer))
{
_target.reject(_entry);
}
@@ -79,7 +84,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- return _entry.acquire(_target.getConsumer());
+ return _entry.acquire(_consumer);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index c459364dbb..5467a57fa1 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,12 +31,16 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private final MessageInstance _entry;
- private ConsumerTarget_0_10 _target;
+ private final ConsumerTarget_0_10 _target;
+ private final ConsumerImpl _consumer;
- public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final ConsumerImpl consumer)
{
_entry = entry;
_target = target;
+ _consumer = consumer;
}
public void onAccept()
@@ -45,7 +50,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_target.getConsumer()))
+ if(_entry.isAcquiredBy(_consumer))
{
_target.release(_entry, setRedelivered);
}
@@ -57,7 +62,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_entry.isAcquiredBy(_target.getConsumer()))
+ if(_entry.isAcquiredBy(_consumer))
{
_target.reject(_entry);
}
@@ -70,7 +75,7 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- boolean acquired = _entry.acquire(_target.getConsumer());
+ boolean acquired = _entry.acquire(_consumer);
if(acquired)
{
_target.recordUnacknowledged(_entry);
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index 7917b7989a..d581d146a8 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.transport.Method;
@@ -29,16 +30,22 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private final ConsumerImpl _consumer;
private long _messageSize;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
+ final ConsumerImpl consumer,
+ ServerSession session,
+ MessageInstance entry,
+ boolean restoreCredit)
{
super();
_sub = sub;
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ _consumer = consumer;
if(restoreCredit)
{
_messageSize = entry.getMessage().getSize();
@@ -51,7 +58,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
_sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
+ if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 5adeba66b1..14082091f9 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -25,6 +25,7 @@ import java.security.AccessControlException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -198,15 +199,44 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
String queueName = method.getQueue();
- VirtualHostImpl vhost = getVirtualHost(session);
+ VirtualHostImpl<?,?,?> vhost = getVirtualHost(session);
+ final Collection<MessageSource> sources = new HashSet<>();
final MessageSource queue = vhost.getMessageSource(queueName);
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && method.getArguments() != null
+ && method.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vhost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = method.getArguments().get("x-multiqueue").toString();
+ }
- if(queue == null)
+ if(sources.isEmpty())
{
- exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(!queue.verifySessionAccess((ServerSession)session))
+ else if(!verifySessionAccess((ServerSession) session, sources))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -250,12 +280,15 @@ public class ServerSessionDelegate extends SessionDelegate
{
options.add(ConsumerImpl.Option.EXCLUSIVE);
}
- ((ServerSession)session).register(
- queue.addConsumer(target,
- filterManager,
- MessageTransferMessage.class,
- destination,
- options));
+ for(MessageSource source : sources)
+ {
+ ((ServerSession) session).register(
+ source.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options));
+ }
}
catch (AMQQueue.ExistingExclusiveConsumer existing)
{
@@ -278,6 +311,23 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
+ protected boolean verifySessionAccess(final ServerSession session, final Collection<MessageSource> queues)
+ {
+ for(MessageSource source : queues)
+ {
+ if(!verifySessionAccess(session, source))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue)
+ {
+ return queue.verifySessionAccess(session);
+ }
+
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
@@ -820,17 +870,15 @@ public class ServerSessionDelegate extends SessionDelegate
return destination;
}
- private VirtualHostImpl getVirtualHost(Session session)
+ private VirtualHostImpl<?,?,?> getVirtualHost(Session session)
{
ServerConnection conn = getServerConnection(session);
- VirtualHostImpl vhost = conn.getVirtualHost();
- return vhost;
+ return conn.getVirtualHost();
}
private ServerConnection getServerConnection(Session session)
{
- ServerConnection conn = (ServerConnection) session.getConnection();
- return conn;
+ return (ServerConnection) session.getConnection();
}
@Override
@@ -1238,7 +1286,7 @@ public class ServerSessionDelegate extends SessionDelegate
exception(session, method, errorCode, description);
}
- else if (!queue.verifySessionAccess((ServerSession)session))
+ else if (!verifySessionAccess((ServerSession) session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1296,7 +1344,7 @@ public class ServerSessionDelegate extends SessionDelegate
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (!queue.verifySessionAccess((ServerSession)session))
+ if (!verifySessionAccess((ServerSession) session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1357,7 +1405,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if(!queue.verifySessionAccess((ServerSession)session))
+ if(!verifySessionAccess((ServerSession) session, queue))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 49bc26149e..d1ec2e139e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -61,6 +61,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -543,10 +544,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
- public ConsumerImpl getSubscription(AMQShortString tag)
+ public ConsumerTarget getSubscription(AMQShortString tag)
{
- final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
- return target == null ? null : target.getConsumer();
+ return _tag2SubscriptionTargetMap.get(tag);
}
/**
@@ -555,7 +555,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
*
*
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param source the queue to subscribe to
+ * @param sources the queues to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
@@ -564,7 +564,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
*
* @throws org.apache.qpid.AMQException if something goes wrong
*/
- public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
@@ -632,18 +632,21 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
});
}
- ConsumerImpl sub =
- source.addConsumer(target,
- filterManager,
- AMQMessage.class,
- AMQShortString.toString(tag),
- options);
- if(sub instanceof Consumer<?>)
+ for(MessageSource source : sources)
{
- final Consumer<?> modelConsumer = (Consumer<?>) sub;
- consumerAdded(modelConsumer);
- modelConsumer.addChangeListener(_consumerClosedListener);
- _consumers.add(modelConsumer);
+ ConsumerImpl sub =
+ source.addConsumer(target,
+ filterManager,
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
+ if (sub instanceof Consumer<?>)
+ {
+ final Consumer<?> modelConsumer = (Consumer<?>) sub;
+ consumerAdded(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ _consumers.add(modelConsumer);
+ }
}
}
catch (AccessControlException e)
@@ -683,13 +686,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- ConsumerImpl sub = target == null ? null : target.getConsumer();
- if (sub != null)
+ Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
+ if (subs != null)
{
- sub.close();
- if(sub instanceof Consumer<?>)
+ for(ConsumerImpl sub : subs)
{
- _consumers.remove(sub);
+ sub.close();
+ if (sub instanceof Consumer<?>)
+ {
+ _consumers.remove(sub);
+ }
}
return true;
}
@@ -763,11 +769,14 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- ConsumerImpl sub = me.getValue().getConsumer();
+ Collection<ConsumerImpl> subs = me.getValue().getConsumers();
- if(sub != null)
+ if(subs != null)
{
- sub.close();
+ for(ConsumerImpl sub : subs)
+ {
+ sub.close();
+ }
}
}
@@ -1032,7 +1041,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// may need to deliver queued messages
for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getConsumer().externalStateChange();
+ for(ConsumerImpl sub : s.getConsumers())
+ {
+ sub.externalStateChange();
+ }
}
}
@@ -1050,11 +1062,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
try
{
- s.getConsumer().getSendLock();
+ s.getSendLock();
}
finally
{
- s.getConsumer().releaseSendLock();
+ s.releaseSendLock();
}
}
}
@@ -1133,8 +1145,8 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// ensure all subscriptions have seen the change to the channel state
for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getConsumer().getSendLock();
- sub.getConsumer().releaseSendLock();
+ sub.getSendLock();
+ sub.releaseSendLock();
}
try
@@ -1169,9 +1181,12 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if(requiresSuspend)
{
_suspended.set(false);
- for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 target : _tag2SubscriptionTargetMap.values())
{
- sub.getConsumer().externalStateChange();
+ for(ConsumerImpl sub : target.getConsumers())
+ {
+ sub.externalStateChange();
+ }
}
}
@@ -1179,7 +1194,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
public String toString()
{
- return "["+_session.toString()+":"+_channelId+"]";
+ return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index d5eed242e7..acb74c99e6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -73,7 +75,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private ConsumerImpl _consumer;
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -93,6 +95,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
+ public List<ConsumerImpl> getConsumers()
+ {
+ return _consumers;
+ }
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -111,12 +118,14 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* thread safe.
*
*
+ *
+ * @param consumer
* @param entry
* @param batch
* @throws org.apache.qpid.AMQException
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -124,7 +133,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -173,11 +182,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ * @param consumer
* @param entry The message to send
* @param batch
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -200,7 +210,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- size = sendToClient(message, props, deliveryTag);
+ size = sendToClient(consumer, message, props, deliveryTag);
}
ref.release();
@@ -287,11 +297,12 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ * @param consumer
* @param entry The message to send
* @param batch
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -301,9 +312,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
- recordMessageDelivery(entry, deliveryTag);
+ recordMessageDelivery(consumer, entry, deliveryTag);
entry.addStateChangeListener(getReleasedStateChangeListener());
- long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
return size;
}
@@ -366,20 +377,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- public ConsumerImpl getConsumer()
- {
- return _consumer;
- }
-
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
@Override
public void consumerAdded(final ConsumerImpl sub)
{
- _consumer = sub;
+ _consumers.add( sub );
}
public AMQSessionModel getSessionModel()
@@ -426,12 +437,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
boolean closed = false;
State state = getState();
- final ConsumerImpl consumer = getConsumer();
+ getSendLock();
- if(consumer != null)
- {
- consumer.getSendLock();
- }
try
{
while(!closed && state != State.CLOSED)
@@ -447,10 +454,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
finally
{
- if(consumer != null)
- {
- consumer.releaseSendLock();
- }
+ releaseSendLock();
}
}
@@ -493,7 +497,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
if(!updateState(State.SUSPENDED, State.ACTIVE))
{
// this is a hack to get round the issue of increasing bytes credit
- getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ notifyCurrentState();
}
}
else
@@ -502,16 +506,20 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
}
- protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+ final InstanceProperties props,
+ final long deliveryTag)
{
- return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+ return _deliveryMethod.deliverToClient(consumer, message, props, deliveryTag);
}
- protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
+ protected void recordMessageDelivery(final ConsumerImpl consumer,
+ final MessageInstance entry,
+ final long deliveryTag)
{
- _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
+ _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index ff0e4199cf..c1e3d850ef 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.HashSet;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -32,13 +36,11 @@ import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
{
private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
@@ -59,7 +61,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHostImpl vHost = protocolConnection.getVirtualHost();
+ VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
if (channel == null)
{
@@ -68,25 +70,55 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
else
{
channel.sync();
+ String queueName = body.getQueue() == null ? null : body.getQueue().asString();
if (_logger.isDebugEnabled())
{
- _logger.debug("BasicConsume: from '" + body.getQueue() +
+ _logger.debug("BasicConsume: from '" + queueName +
"' for:" + body.getConsumerTag() +
" nowait:" + body.getNowait() +
" args:" + body.getArguments());
}
- MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && body.getArguments() != null
+ && body.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = body.getArguments().get("x-multiqueue").toString();
+ }
- if (queue == null)
+ if (sources.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No queue for '" + body.getQueue() + "'");
+ _logger.debug("No queue for '" + queueName + "'");
}
- if (body.getQueue() != null)
+ if (queueName != null)
{
- String msg = "No such queue, '" + body.getQueue() + "'";
+ String msg = "No such queue, '" + queueName + "'";
throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
@@ -114,7 +146,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
- queue,
+ sources,
!body.getNoAck(),
body.getArguments(),
body.getExclusive(),
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 58989bbef9..235d263798 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -138,7 +139,9 @@ public class AcknowledgeTest extends QpidTestCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
+ AMQShortString subscriber = _channel.consumeFromSource(null,
+ Collections.singleton(_queue),
+ true, null, true, false);
getQueue().deliverAsync();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index cf065de38a..a4402efc84 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -143,6 +144,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.consumeFromSource(null, queue, true, filters, true, false);
+ return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false);
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 918a890af5..c5d9a5e35d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -93,7 +93,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -108,11 +108,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
finally
{
- getConsumer().releaseSendLock();
+ releaseSendLock();
}
}
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// TODO
long size = entry.getMessage().getSize();
@@ -515,7 +515,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
-
+ close();
}
@Override
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 34f08615ad..8a3ef65979 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1065,6 +1065,12 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index a3b1f932ac..3f873a24ff 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.management.amqp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
@@ -27,19 +31,12 @@ import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
class ManagementNodeConsumer implements ConsumerImpl
{
private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
- private final Lock _stateChangeLock = new ReentrantLock();
private final String _name;
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
@@ -49,7 +46,7 @@ class ManagementNodeConsumer implements ConsumerImpl
_name = consumerName;
_managementNode = managementNode;
_target = target;
- target.setStateListener(_targetChangeListener);
+ target.addStateListener(_targetChangeListener);
}
@Override
@@ -133,19 +130,19 @@ class ManagementNodeConsumer implements ConsumerImpl
@Override
public boolean trySendLock()
{
- return _stateChangeLock.tryLock();
+ return _target.trySendLock();
}
@Override
public void getSendLock()
{
- _stateChangeLock.lock();
+ _target.getSendLock();
}
@Override
public void releaseSendLock()
{
- _stateChangeLock.unlock();
+ _target.releaseSendLock();
}
@@ -174,13 +171,13 @@ class ManagementNodeConsumer implements ConsumerImpl
void send(final InternalMessage response)
{
- getSendLock();
+ _target.getSendLock();
try
{
final ManagementResponse responseEntry = new ManagementResponse(this, response);
if(_queue.isEmpty() && _target.allocateCredit(response))
{
- _target.send(responseEntry,false);
+ _target.send(this, responseEntry, false);
}
else
{
@@ -189,7 +186,7 @@ class ManagementNodeConsumer implements ConsumerImpl
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
@@ -209,7 +206,7 @@ class ManagementNodeConsumer implements ConsumerImpl
private void deliverMessages()
{
- getSendLock();
+ _target.getSendLock();
try
{
while(!_queue.isEmpty())
@@ -219,7 +216,7 @@ class ManagementNodeConsumer implements ConsumerImpl
if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
{
_queue.remove(0);
- _target.send(managementResponse,false);
+ _target.send(this, managementResponse, false);
}
else
{
@@ -229,7 +226,7 @@ class ManagementNodeConsumer implements ConsumerImpl
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 03e7eab61b..501ce40db7 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -84,6 +84,12 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return _consumer;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0183c30276..9bdcb9e83f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -683,47 +683,48 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
int type = resolveAddressType(dest);
-
+ boolean resolved = false;
switch (type)
{
case AMQDestination.QUEUE_TYPE:
- {
+
+ setLegacyFieldsForQueueType(dest);
if(createNode)
{
- setLegacyFieldsForQueueType(dest);
handleQueueNodeCreation(dest,noLocal);
- break;
+ resolved = true;
}
else if (isQueueExist(dest,assertNode))
{
- setLegacyFieldsForQueueType(dest);
- break;
+ resolved = true;
}
- }
+ break;
case AMQDestination.TOPIC_TYPE:
- {
+
+ setLegacyFieldsForTopicType(dest);
if(createNode)
{
- setLegacyFieldsForTopicType(dest);
verifySubject(dest);
handleExchangeNodeCreation(dest);
- break;
+ resolved = true;
}
else if (isExchangeExist(dest,assertNode))
{
- setLegacyFieldsForTopicType(dest);
verifySubject(dest);
- break;
+ resolved = true;
}
- }
+ break;
default:
throw new AMQException(
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(System.currentTimeMillis());
+ if(resolved)
+ {
+ dest.setAddressResolved(System.currentTimeMillis());
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 68b7cf1f88..dc1f9a719e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -645,18 +645,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
Link link = destination.getLink();
if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
- arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
+ arguments.putAll(link.getSubscription().getArgs());
}
boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+ String queue = queueName == null ? destination.getAddressName() : queueName.toString();
getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
+ (queue, String.valueOf(tag),
acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+ String consumerTag = (consumer).getConsumerTagString();
if (capacity == 0)
{
@@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
catch(SessionException e)
{
- if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED
+ || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
{
match = false;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 0145d15111..b0615ea99f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -514,6 +514,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
queueName = preprocessAddressTopic(consumer, queueName);
+ AMQDestination destination = consumer.getDestination();
+
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+ Link link = destination.getLink();
+ if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
+ {
+ arguments.putAll(link.getSubscription().getArgs());
+ }
+
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
new AMQShortString(String.valueOf(tag)),
@@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- consumer.getArguments());
+ FieldTable.convertToFieldTable(arguments));
AMQFrame jmsConsume = body.generateFrame(getChannelId());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 99154e820f..91e2143c47 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -43,7 +43,8 @@ public class AddressHelper
public static final String LINK = "link";
public static final String X_DECLARE = "x-declare";
public static final String X_BINDINGS = "x-bindings";
- public static final String X_SUBSCRIBE = "x-subscribes";
+ public static final String X_SUBSCRIBES = "x-subscribes";
+ public static final String X_SUBSCRIBE = "x-subscribe";
public static final String CREATE = "create";
public static final String ASSERT = "assert";
public static final String DELETE = "delete";
@@ -265,19 +266,32 @@ public class AddressHelper
Map linkMap = (Map) _address.getOptions().get(LINK);
- if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
- {
- Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
-
- if (x_subscribe.containsKey(ARGUMENTS))
+ if (linkMap != null)
+ {
+ Map x_subscribe = null;
+
+ if(linkMap.containsKey(X_SUBSCRIBE))
{
- link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+ }
+ else if(linkMap.containsKey(X_SUBSCRIBES))
+ {
+ // left in for backwards compatibility with old broken constant
+ x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES);
+ }
+
+ if(x_subscribe != null)
+ {
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String, Object>) x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false;
+
+ link.getSubscription().setExclusive(exclusive);
}
-
- boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
- Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
-
- link.getSubscription().setExclusive(exclusive);
}
link.setBindings(getBindings(linkMap));
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index a614690f83..7e9cb3072a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -28,7 +28,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination.Binding;
public class Link
-{
+{
public enum FilterType { SQL92, XQUERY, SUBJECT }
public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 391498194b..6fef0cd553 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -96,25 +96,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1";
AMQDestination dest = new AMQAnyDestination(addr1);
- final String queueErrorMessage = "The name 'testQueue1' supplied in the address " +
- "doesn't resolve to an exchange or a queue";
try
{
cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains(queueErrorMessage));
- }
+ // pass
+ _connection = getConnection();
+ jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ dest = new AMQAnyDestination(addr1);
- try
- {
- prod = jmsSession.createProducer(dest);
- }
- catch(JMSException e)
- {
- assertTrue(e.getCause().getMessage().contains(queueErrorMessage)
- || e.getCause().getCause().getMessage().contains(queueErrorMessage));
}
assertFalse("Queue should not be created",(
@@ -170,7 +162,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
+ // pass
}
try
@@ -179,8 +171,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage)
- || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage));
+ // pass
+ _connection = getConnection();
+ jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ dest = new AMQAnyDestination(addr1);
}
assertFalse("Queue should not be created",(
@@ -196,8 +190,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
+ // pass
+ _connection = getConnection();
+ jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ dest = new AMQAnyDestination(addr1);
}
+
assertFalse("Queue should not be created",(
(AMQSession)jmsSession).isQueueExist(dest, false));
@@ -610,7 +608,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// Using the ADDR method
// default case
- queue = ssn.createQueue("ADDR:my-queue2");
+ queue = ssn.createQueue("ADDR:my-queue2 ; { assert : sender }");
try
{
prod = ssn.createProducer(queue);
@@ -618,9 +616,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(Exception e)
{
- String s = "The name 'my-queue2' supplied in the address " +
- "doesn't resolve to an exchange or a queue";
- assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage()));
+ String s = "Assert failed for queue : my-queue2";
+ assertTrue(e.getCause().getMessage().contains(s) || e.getCause().getCause().getMessage().contains(s));
}
// explicit create case