summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-26 17:01:07 +0000
commit1635ca5849b7c765d5d7be9cd01d46b06349f320 (patch)
treef304353182e02369661b8ecfdde357a288b183e3 /qpid/java/broker-core/src
parent8328e0398707d2cccdadb95fe1b4c4563b930cc1 (diff)
downloadqpid-python-1635ca5849b7c765d5d7be9cd01d46b06349f320.tar.gz
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java50
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java8
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java32
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java5
11 files changed, 145 insertions, 48 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index aa721e598a..192164ca6b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -20,16 +20,24 @@
*/
package org.apache.qpid.server.consumer;
-import org.apache.qpid.server.util.StateChangeListener;
-
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
private final AtomicReference<State> _state;
- private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener =
- new AtomicReference<StateChangeListener<ConsumerTarget, State>>();
+
+ private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
+ CopyOnWriteArraySet<>();
+
+ private final Lock _stateChangeLock = new ReentrantLock();
+
protected AbstractConsumerTarget(final State initialState)
{
@@ -46,8 +54,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
{
if(_state.compareAndSet(from, to))
{
- StateChangeListener<ConsumerTarget, State> listener = _stateListener.get();
- if(listener != null)
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
{
listener.stateChanged(this, from, to);
}
@@ -59,15 +66,38 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
}
+ public final void notifyCurrentState()
+ {
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ State state = getState();
+ listener.stateChanged(this, state, state);
+ }
+ }
+ public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _stateChangeListeners.add(listener);
+ }
+
+ @Override
+ public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _stateChangeListeners.remove(listener);
+ }
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
- public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ public final void getSendLock()
{
- _stateListener.set(listener);
+ _stateChangeLock.lock();
}
- public final StateChangeListener<ConsumerTarget, State> getStateListener()
+ public final void releaseSendLock()
{
- return _stateListener.get();
+ _stateChangeLock.unlock();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index b7be1bfd9b..5aef922da5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -31,6 +31,8 @@ public interface ConsumerTarget
void acquisitionRemoved(MessageInstance node);
+ void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -42,7 +44,7 @@ public interface ConsumerTarget
void consumerRemoved(ConsumerImpl sub);
- void setStateListener(StateChangeListener<ConsumerTarget, State> listener);
+ void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
@@ -50,7 +52,7 @@ public interface ConsumerTarget
AMQSessionModel getSessionModel();
- long send(MessageInstance entry, boolean batch);
+ long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
void flushBatched();
@@ -65,4 +67,11 @@ public interface ConsumerTarget
boolean isSuspended();
boolean close();
+
+ boolean trySendLock();
+
+ void getSendLock();
+
+ void releaseSendLock();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 1bf451948d..d3518f428b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -79,6 +79,8 @@ public interface MessageInstance
Filterable asFilterable();
+ ConsumerImpl getAcquiringConsumer();
+
public static enum State
{
AVAILABLE,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index 71b7636159..02cd7ff56f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
@@ -52,4 +53,6 @@ public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl,
MessageInstance.ConsumerAcquiredState<X> getOwningState();
QueueContext getQueueContext();
+
+ ConsumerTarget getTarget();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 4044c938db..60bad7bf1c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -31,8 +31,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -65,7 +63,6 @@ class QueueConsumerImpl
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
- private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
private final boolean _acquires;
@@ -90,6 +87,8 @@ class QueueConsumerImpl
private final ConsumerTarget _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
+ private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+ _listener;
private volatile QueueContext _queueContext;
private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
{
@@ -134,17 +133,17 @@ class QueueConsumerImpl
setupLogging();
- _target.setStateListener(
- new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- targetStateChanged(oldState, newState);
- }
- });
+ _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ targetStateChanged(oldState, newState);
+ }
+ };
+ _target.addStateListener(_listener);
}
private static Map<String, Object> createAttributeMap(String name, FilterManager filters, EnumSet<Option> optionSet)
@@ -202,6 +201,7 @@ class QueueConsumerImpl
}
}
+ @Override
public ConsumerTarget getTarget()
{
return _target;
@@ -248,17 +248,17 @@ class QueueConsumerImpl
{
if(_closed.compareAndSet(false,true))
{
- getSendLock();
+ _target.getSendLock();
try
{
- _target.close();
_target.consumerRemoved(this);
+ _target.removeStateChangeListener(_listener);
_queue.unregisterConsumer(this);
deleted();
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
@@ -420,17 +420,17 @@ class QueueConsumerImpl
public final boolean trySendLock()
{
- return _stateChangeLock.tryLock();
+ return getTarget().trySendLock();
}
public final void getSendLock()
{
- _stateChangeLock.lock();
+ getTarget().getSendLock();
}
public final void releaseSendLock()
{
- _stateChangeLock.unlock();
+ getTarget().releaseSendLock();
}
public final long getCreateTime()
@@ -471,7 +471,7 @@ class QueueConsumerImpl
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
- long size = _target.send(entry, batch);
+ long size = _target.send(this, entry, batch);
_deliveredBytes.addAndGet(size);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6c541d78ef..a0f2dc798d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -247,6 +247,26 @@ public abstract class QueueEntryImpl implements QueueEntry
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ ConsumerImpl consumer;
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ consumer = ((ConsumerAcquiredState)state).getConsumer();
+ }
+ else if(state instanceof LockedAcquiredState)
+ {
+ consumer = ((LockedAcquiredState)state).getConsumer();
+ }
+ else
+ {
+ consumer = null;
+ }
+ return consumer;
+ }
+
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
index 30f6fa66c6..e784126d24 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -21,18 +21,18 @@
package org.apache.qpid.server.queue;
import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to consumers, which is necessary
@@ -111,7 +111,7 @@ public class QueueRunner implements Runnable
public String toString()
{
- return "QueueRunner-" + _queue.getLogSubject();
+ return "QueueRunner-" + _queue.getLogSubject().toLogString();
}
public void execute()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 910727ce42..750db4aba5 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -21,18 +21,18 @@
package org.apache.qpid.server.queue;
+import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
class SubFlushRunner implements Runnable
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index ad33ecadcf..1482576748 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -167,7 +167,7 @@ public class MockConsumer implements ConsumerTarget
{
}
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
long size = entry.getMessage().getSize();
if (messages.contains(entry))
@@ -202,7 +202,7 @@ public class MockConsumer implements ConsumerTarget
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
-
+ close();
}
public void setState(State state)
@@ -216,11 +216,20 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
+ public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener)
{
_listener = listener;
}
+ @Override
+ public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ if(_listener == listener)
+ {
+ _listener = null;
+ }
+ }
+
public ArrayList<MessageInstance> getMessages()
{
return messages;
@@ -242,6 +251,23 @@ public class MockConsumer implements ConsumerTarget
_isActive = isActive;
}
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public final void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public final void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+
private static class MockSessionModel implements AMQSessionModel
{
private final UUID _id = UUID.randomUUID();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 37c4eeb127..08c0de95d5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -60,6 +60,12 @@ public class MockMessageInstance implements MessageInstance
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index f13886d2b2..7fc36f39bb 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -205,12 +205,13 @@ public class StandardQueueTest extends AbstractQueueTestBase
{
/**
* Send a message and decrement latch
+ * @param consumer
* @param entry
* @param batch
*/
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- long size = super.send(entry, batch);
+ long size = super.send(consumer, entry, batch);
latch.countDown();
return size;
}