diff options
| author | Keith Wall <kwall@apache.org> | 2012-06-30 12:31:49 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-06-30 12:31:49 +0000 |
| commit | 74379046f7bd8a787c9c5f32df75f1ca2a39438a (patch) | |
| tree | c4510ad223f29cef5f6a2c7a69fc85aa20cb1e35 /qpid/java/broker/src | |
| parent | 9b2699d8a9b257e17d26495d19590f61d06c05c7 (diff) | |
| download | qpid-python-74379046f7bd8a787c9c5f32df75f1ca2a39438a.tar.gz | |
QPID-4902: NPE from SimpleAMQQueue and RejectedExecutionExecution handling
Guard against NPE in setLastSeenEntry. #execute() method change to ignore REE in the case where the
queue has already been stopped (logged at ERROR otherwise). Change Subscription*#_queueContext member
to volatile as this member is get/set from different threads during the queue's lifecycle
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355721 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
5 files changed, 29 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index b4195d7e5a..ad8eafc850 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -67,7 +67,7 @@ class Subscription_1_0 implements Subscription private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this); private final long _id; private final boolean _acquires; - private AMQQueue.Context _queueContext; + private volatile AMQQueue.Context _queueContext; private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); private ReentrantLock _stateChangeLock = new ReentrantLock(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 3d54bba48f..242ad24048 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -296,7 +297,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes public void execute(Runnable runnable) { - _asyncDelivery.execute(runnable); + try + { + _asyncDelivery.execute(runnable); + } + catch (RejectedExecutionException ree) + { + if (_stopped.get()) + { + // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. + } + else + { + _logger.error("Unexpected rejected execution", ree); + throw ree; + } + } } public AMQShortString getNameShortString() @@ -863,12 +879,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) { QueueContext subContext = (QueueContext) sub.getQueueContext(); - QueueEntry releasedEntry = subContext.getReleasedEntry(); - - QueueContext._lastSeenUpdater.set(subContext, entry); - if(releasedEntry == entry) + if (subContext != null) { - QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); + QueueEntry releasedEntry = subContext.getReleasedEntry(); + + QueueContext._lastSeenUpdater.set(subContext, entry); + if(releasedEntry == entry) + { + QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index baf5d09c95..0d648862b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -76,7 +76,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private AMQQueue.Context _queueContext; + private volatile AMQQueue.Context _queueContext; private final ClientDeliveryMethod _deliveryMethod; private final RecordDeliveryMethod _recordMethod; @@ -470,11 +470,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _deleted.set(true); } - public boolean filtersMessages() - { - return _filters != null || _noLocal; - } - public boolean hasInterest(QueueEntry entry) { //check that the message hasn't been rejected @@ -510,13 +505,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - private String id = String.valueOf(System.identityHashCode(this)); - - private String debugIdentity() - { - return id; - } - private boolean checkFilters(QueueEntry msg) { return (_filters == null) || _filters.allAllow(msg); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index db378f2bf3..df648b88ae 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -98,7 +98,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private AMQQueue.Context _queueContext; + private volatile AMQQueue.Context _queueContext; private final AtomicBoolean _deleted = new AtomicBoolean(false); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 0aa0569b07..363eb5ecab 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -48,7 +48,7 @@ public class MockSubscription implements Subscription private AMQShortString tag = new AMQShortString("mocktag"); private AMQQueue queue = null; private StateListener _listener = null; - private AMQQueue.Context _queueContext = null; + private volatile AMQQueue.Context _queueContext = null; private State _state = State.ACTIVE; private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); private final Lock _stateChangeLock = new ReentrantLock(); |
