summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java40
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java15
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java7
3 files changed, 52 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 192164ca6b..0421a66abf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.consumer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +40,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
CopyOnWriteArraySet<>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private final AtomicInteger _stateActivates = new AtomicInteger();
protected AbstractConsumerTarget(final State initialState)
@@ -54,9 +58,40 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
{
if(_state.compareAndSet(from, to))
{
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ if (to == State.ACTIVE && _stateChangeListeners.size() > 1)
{
- listener.stateChanged(this, from, to);
+ int offset = _stateActivates.incrementAndGet();
+ if (offset >= _stateChangeListeners.size())
+ {
+ _stateActivates.set(0);
+ offset = 0;
+ }
+
+ List<StateChangeListener<ConsumerTarget, State>> holdovers = new ArrayList<>();
+ int pos = 0;
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ if (pos++ < offset)
+ {
+ holdovers.add(listener);
+ }
+ else
+ {
+ listener.stateChanged(this, from, to);
+ }
+ }
+ for (StateChangeListener<ConsumerTarget, State> listener : holdovers)
+ {
+ listener.stateChanged(this, from, to);
+ }
+
+ }
+ else
+ {
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ listener.stateChanged(this, from, to);
+ }
}
return true;
}
@@ -68,6 +103,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
public final void notifyCurrentState()
{
+
for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
{
State state = getState();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 12dd0b1e03..6c439993c7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -191,7 +191,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
- static final int MAX_ASYNC_DELIVERIES = 80;
+ private int _maxAsyncDeliveries;
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
@@ -445,6 +445,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_messageGroupManager = null;
}
+ _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
updateAlertChecks();
}
@@ -1536,7 +1537,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _virtualHost.getEventLogger();
}
-
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -1892,7 +1892,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
boolean flushConsumer(QueueConsumer<?> sub, long iterations)
{
boolean atTail = false;
- final boolean keepSendLockHeld = iterations <= AbstractQueue.MAX_ASYNC_DELIVERIES;
+ final boolean keepSendLockHeld = iterations <= getMaxAsyncDeliveries();
boolean queueEmpty = false;
try
@@ -2128,7 +2128,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
boolean deliveryIncomplete = true;
boolean lastLoop = false;
- int iterations = MAX_ASYNC_DELIVERIES;
+ int iterations = getMaxAsyncDeliveries();
final int numSubs = _consumerList.size();
@@ -2956,6 +2956,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_virtualHost.getSecurityManager().authoriseUpdate(this);
}
+ int getMaxAsyncDeliveries()
+ {
+ return _maxAsyncDeliveries;
+ }
+
+
+
private static final String[] NON_NEGATIVE_NUMBERS = {
ALERT_REPEAT_GAP,
ALERT_THRESHOLD_MESSAGE_AGE,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
index 750db4aba5..c860918a0b 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
@@ -49,7 +49,6 @@ class SubFlushRunner implements Runnable
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
- private static final long ITERATIONS = AbstractQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
public SubFlushRunner(QueueConsumerImpl sub)
@@ -70,7 +69,7 @@ class SubFlushRunner implements Runnable
_stateChange.set(false);
try
{
- complete = getQueue().flushConsumer(_sub, ITERATIONS);
+ complete = getQueue().flushConsumer(_sub, getQueue().getMaxAsyncDeliveries());
}
catch (ConnectionScopedRuntimeException | TransportException e)
{
@@ -102,9 +101,9 @@ class SubFlushRunner implements Runnable
}
}
- private AbstractQueue getQueue()
+ private AbstractQueue<?> getQueue()
{
- return (AbstractQueue) _sub.getQueue();
+ return (AbstractQueue<?>) _sub.getQueue();
}
public String toString()