diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-26 17:01:07 +0000 |
| commit | 1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch) | |
| tree | f304353182e02369661b8ecfdde357a288b183e3 /qpid/java/broker-core/src | |
| parent | 8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff) | |
| download | qpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz | |
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
11 files changed, 145 insertions, 48 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index aa721e598a..192164ca6b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -20,16 +20,24 @@ */ package org.apache.qpid.server.consumer; -import org.apache.qpid.server.util.StateChangeListener; - +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.server.util.StateChangeListener; public abstract class AbstractConsumerTarget implements ConsumerTarget { private final AtomicReference<State> _state; - private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener = - new AtomicReference<StateChangeListener<ConsumerTarget, State>>(); + + private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new + CopyOnWriteArraySet<>(); + + private final Lock _stateChangeLock = new ReentrantLock(); + protected AbstractConsumerTarget(final State initialState) { @@ -46,8 +54,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget { if(_state.compareAndSet(from, to)) { - StateChangeListener<ConsumerTarget, State> listener = _stateListener.get(); - if(listener != null) + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) { listener.stateChanged(this, from, to); } @@ -59,15 +66,38 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } } + public final void notifyCurrentState() + { + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) + { + State state = getState(); + listener.stateChanged(this, state, state); + } + } + public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener) + { + _stateChangeListeners.add(listener); + } + + @Override + public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener) + { + _stateChangeListeners.remove(listener); + } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } - public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener) + public final void getSendLock() { - _stateListener.set(listener); + _stateChangeLock.lock(); } - public final StateChangeListener<ConsumerTarget, State> getStateListener() + public final void releaseSendLock() { - return _stateListener.get(); + _stateChangeLock.unlock(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index b7be1bfd9b..5aef922da5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -31,6 +31,8 @@ public interface ConsumerTarget void acquisitionRemoved(MessageInstance node); + void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); + enum State { ACTIVE, SUSPENDED, CLOSED @@ -42,7 +44,7 @@ public interface ConsumerTarget void consumerRemoved(ConsumerImpl sub); - void setStateListener(StateChangeListener<ConsumerTarget, State> listener); + void addStateListener(StateChangeListener<ConsumerTarget, State> listener); long getUnacknowledgedBytes(); @@ -50,7 +52,7 @@ public interface ConsumerTarget AMQSessionModel getSessionModel(); - long send(MessageInstance entry, boolean batch); + long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch); void flushBatched(); @@ -65,4 +67,11 @@ public interface ConsumerTarget boolean isSuspended(); boolean close(); + + boolean trySendLock(); + + void getSendLock(); + + void releaseSendLock(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 1bf451948d..d3518f428b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -79,6 +79,8 @@ public interface MessageInstance Filterable asFilterable(); + ConsumerImpl getAcquiringConsumer(); + public static enum State { AVAILABLE, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 71b7636159..02cd7ff56f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.model.Consumer; @@ -52,4 +53,6 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, MessageInstance.ConsumerAcquiredState<X> getOwningState(); QueueContext getQueueContext(); + + ConsumerTarget getTarget(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 4044c938db..60bad7bf1c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; @@ -65,7 +63,6 @@ class QueueConsumerImpl private final AtomicBoolean _targetClosed = new AtomicBoolean(false); private final AtomicBoolean _closed = new AtomicBoolean(false); private final long _consumerNumber; - private final Lock _stateChangeLock = new ReentrantLock(); private final long _createTime = System.currentTimeMillis(); private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this); private final boolean _acquires; @@ -90,6 +87,8 @@ class QueueConsumerImpl private final ConsumerTarget _target; private final SubFlushRunner _runner = new SubFlushRunner(this); + private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> + _listener; private volatile QueueContext _queueContext; private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>() { @@ -134,17 +133,17 @@ class QueueConsumerImpl setupLogging(); - _target.setStateListener( - new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() - { - @Override - public void stateChanged(final ConsumerTarget object, - final ConsumerTarget.State oldState, - final ConsumerTarget.State newState) - { - targetStateChanged(oldState, newState); - } - }); + _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>() + { + @Override + public void stateChanged(final ConsumerTarget object, + final ConsumerTarget.State oldState, + final ConsumerTarget.State newState) + { + targetStateChanged(oldState, newState); + } + }; + _target.addStateListener(_listener); } private static Map<String, Object> createAttributeMap(String name, FilterManager filters, EnumSet<Option> optionSet) @@ -202,6 +201,7 @@ class QueueConsumerImpl } } + @Override public ConsumerTarget getTarget() { return _target; @@ -248,17 +248,17 @@ class QueueConsumerImpl { if(_closed.compareAndSet(false,true)) { - getSendLock(); + _target.getSendLock(); try { - _target.close(); _target.consumerRemoved(this); + _target.removeStateChangeListener(_listener); _queue.unregisterConsumer(this); deleted(); } finally { - releaseSendLock(); + _target.releaseSendLock(); } } @@ -420,17 +420,17 @@ class QueueConsumerImpl public final boolean trySendLock() { - return _stateChangeLock.tryLock(); + return getTarget().trySendLock(); } public final void getSendLock() { - _stateChangeLock.lock(); + getTarget().getSendLock(); } public final void releaseSendLock() { - _stateChangeLock.unlock(); + getTarget().releaseSendLock(); } public final long getCreateTime() @@ -471,7 +471,7 @@ class QueueConsumerImpl public final void send(final QueueEntry entry, final boolean batch) { _deliveredCount.incrementAndGet(); - long size = _target.send(entry, batch); + long size = _target.send(this, entry, batch); _deliveredBytes.addAndGet(size); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6c541d78ef..a0f2dc798d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -247,6 +247,26 @@ public abstract class QueueEntryImpl implements QueueEntry } @Override + public ConsumerImpl getAcquiringConsumer() + { + ConsumerImpl consumer; + EntryState state = _state; + if(state instanceof ConsumerAcquiredState) + { + consumer = ((ConsumerAcquiredState)state).getConsumer(); + } + else if(state instanceof LockedAcquiredState) + { + consumer = ((LockedAcquiredState)state).getConsumer(); + } + else + { + consumer = null; + } + return consumer; + } + + @Override public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index 30f6fa66c6..e784126d24 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -21,18 +21,18 @@ package org.apache.qpid.server.queue; import java.security.PrivilegedAction; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.security.auth.Subject; + import org.apache.log4j.Logger; + import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; -import javax.security.auth.Subject; - /** * QueueRunners are Runnables used to process a queue when requiring * asynchronous message delivery to consumers, which is necessary @@ -111,7 +111,7 @@ public class QueueRunner implements Runnable public String toString() { - return "QueueRunner-" + _queue.getLogSubject(); + return "QueueRunner-" + _queue.getLogSubject().toLogString(); } public void execute() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 910727ce42..750db4aba5 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -21,18 +21,18 @@ package org.apache.qpid.server.queue; +import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.security.auth.Subject; + import org.apache.log4j.Logger; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.TransportException; -import javax.security.auth.Subject; -import java.security.PrivilegedAction; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - class SubFlushRunner implements Runnable { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index ad33ecadcf..1482576748 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -167,7 +167,7 @@ public class MockConsumer implements ConsumerTarget { } - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { long size = entry.getMessage().getSize(); if (messages.contains(entry)) @@ -202,7 +202,7 @@ public class MockConsumer implements ConsumerTarget @Override public void consumerRemoved(final ConsumerImpl sub) { - + close(); } public void setState(State state) @@ -216,11 +216,20 @@ public class MockConsumer implements ConsumerTarget } @Override - public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener) + public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener) { _listener = listener; } + @Override + public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener) + { + if(_listener == listener) + { + _listener = null; + } + } + public ArrayList<MessageInstance> getMessages() { return messages; @@ -242,6 +251,23 @@ public class MockConsumer implements ConsumerTarget _isActive = isActive; } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public final void getSendLock() + { + _stateChangeLock.lock(); + } + + public final void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + private static class MockSessionModel implements AMQSessionModel { private final UUID _id = UUID.randomUUID(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 37c4eeb127..08c0de95d5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -60,6 +60,12 @@ public class MockMessageInstance implements MessageInstance } @Override + public ConsumerImpl getAcquiringConsumer() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index f13886d2b2..7fc36f39bb 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -205,12 +205,13 @@ public class StandardQueueTest extends AbstractQueueTestBase { /** * Send a message and decrement latch + * @param consumer * @param entry * @param batch */ - public long send(MessageInstance entry, boolean batch) + public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { - long size = super.send(entry, batch); + long size = super.send(consumer, entry, batch); latch.countDown(); return size; } |
