diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-31 13:36:25 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-31 13:36:25 +0000 |
| commit | db3522d576b5d9cbfb23ba40f3c5ba2b8501429b (patch) | |
| tree | 09b751b2e1801d0378f83acf779f5bdf8b2d8aee /qpid/java | |
| parent | 24d10be23d2368868c56e0053030972e6b15e275 (diff) | |
| download | qpid-python-db3522d576b5d9cbfb23ba40f3c5ba2b8501429b.tar.gz | |
QPID-6204 : [Java Broker] Improve distribution fairness for multi-queue consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1635768 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 52 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 192164ca6b..0421a66abf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -20,8 +20,11 @@ */ package org.apache.qpid.server.consumer; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -37,6 +40,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget CopyOnWriteArraySet<>(); private final Lock _stateChangeLock = new ReentrantLock(); + private final AtomicInteger _stateActivates = new AtomicInteger(); protected AbstractConsumerTarget(final State initialState) @@ -54,9 +58,40 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget { if(_state.compareAndSet(from, to)) { - for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) + if (to == State.ACTIVE && _stateChangeListeners.size() > 1) { - listener.stateChanged(this, from, to); + int offset = _stateActivates.incrementAndGet(); + if (offset >= _stateChangeListeners.size()) + { + _stateActivates.set(0); + offset = 0; + } + + List<StateChangeListener<ConsumerTarget, State>> holdovers = new ArrayList<>(); + int pos = 0; + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) + { + if (pos++ < offset) + { + holdovers.add(listener); + } + else + { + listener.stateChanged(this, from, to); + } + } + for (StateChangeListener<ConsumerTarget, State> listener : holdovers) + { + listener.stateChanged(this, from, to); + } + + } + else + { + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) + { + listener.stateChanged(this, from, to); + } } return true; } @@ -68,6 +103,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget public final void notifyCurrentState() { + for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners) { State state = getState(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 12dd0b1e03..6c439993c7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -191,7 +191,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class)); - static final int MAX_ASYNC_DELIVERIES = 80; + private int _maxAsyncDeliveries; private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); @@ -445,6 +445,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _messageGroupManager = null; } + _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES); updateAlertChecks(); } @@ -1536,7 +1537,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return _virtualHost.getEventLogger(); } - public static interface QueueEntryFilter { public boolean accept(QueueEntry entry); @@ -1892,7 +1892,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> boolean flushConsumer(QueueConsumer<?> sub, long iterations) { boolean atTail = false; - final boolean keepSendLockHeld = iterations <= AbstractQueue.MAX_ASYNC_DELIVERIES; + final boolean keepSendLockHeld = iterations <= getMaxAsyncDeliveries(); boolean queueEmpty = false; try @@ -2128,7 +2128,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> boolean deliveryIncomplete = true; boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; + int iterations = getMaxAsyncDeliveries(); final int numSubs = _consumerList.size(); @@ -2956,6 +2956,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _virtualHost.getSecurityManager().authoriseUpdate(this); } + int getMaxAsyncDeliveries() + { + return _maxAsyncDeliveries; + } + + + private static final String[] NON_NEGATIVE_NUMBERS = { ALERT_REPEAT_GAP, ALERT_THRESHOLD_MESSAGE_AGE, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 750db4aba5..c860918a0b 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -49,7 +49,6 @@ class SubFlushRunner implements Runnable private final AtomicInteger _scheduled = new AtomicInteger(IDLE); - private static final long ITERATIONS = AbstractQueue.MAX_ASYNC_DELIVERIES; private final AtomicBoolean _stateChange = new AtomicBoolean(); public SubFlushRunner(QueueConsumerImpl sub) @@ -70,7 +69,7 @@ class SubFlushRunner implements Runnable _stateChange.set(false); try { - complete = getQueue().flushConsumer(_sub, ITERATIONS); + complete = getQueue().flushConsumer(_sub, getQueue().getMaxAsyncDeliveries()); } catch (ConnectionScopedRuntimeException | TransportException e) { @@ -102,9 +101,9 @@ class SubFlushRunner implements Runnable } } - private AbstractQueue getQueue() + private AbstractQueue<?> getQueue() { - return (AbstractQueue) _sub.getQueue(); + return (AbstractQueue<?>) _sub.getQueue(); } public String toString() |
