summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-06-30 12:31:49 +0000
committerKeith Wall <kwall@apache.org>2012-06-30 12:31:49 +0000
commit74379046f7bd8a787c9c5f32df75f1ca2a39438a (patch)
treec4510ad223f29cef5f6a2c7a69fc85aa20cb1e35 /qpid/java/broker/src
parent9b2699d8a9b257e17d26495d19590f61d06c05c7 (diff)
downloadqpid-python-74379046f7bd8a787c9c5f32df75f1ca2a39438a.tar.gz
QPID-4902: NPE from SimpleAMQQueue and RejectedExecutionExecution handling
Guard against NPE in setLastSeenEntry. #execute() method change to ignore REE in the case where the queue has already been stopped (logged at ERROR otherwise). Change Subscription*#_queueContext member to volatile as this member is get/set from different threads during the queue's lifecycle git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1355721 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java31
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java2
5 files changed, 29 insertions, 22 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index b4195d7e5a..ad8eafc850 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -67,7 +67,7 @@ class Subscription_1_0 implements Subscription
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
private final long _id;
private final boolean _acquires;
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
private ReentrantLock _stateChangeLock = new ReentrantLock();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 3d54bba48f..242ad24048 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -296,7 +297,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void execute(Runnable runnable)
{
- _asyncDelivery.execute(runnable);
+ try
+ {
+ _asyncDelivery.execute(runnable);
+ }
+ catch (RejectedExecutionException ree)
+ {
+ if (_stopped.get())
+ {
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ }
+ else
+ {
+ _logger.error("Unexpected rejected execution", ree);
+ throw ree;
+ }
+ }
}
public AMQShortString getNameShortString()
@@ -863,12 +879,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
{
QueueContext subContext = (QueueContext) sub.getQueueContext();
- QueueEntry releasedEntry = subContext.getReleasedEntry();
-
- QueueContext._lastSeenUpdater.set(subContext, entry);
- if(releasedEntry == entry)
+ if (subContext != null)
{
- QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+ QueueContext._lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
+ {
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index baf5d09c95..0d648862b1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -76,7 +76,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private final ClientDeliveryMethod _deliveryMethod;
private final RecordDeliveryMethod _recordMethod;
@@ -470,11 +470,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_deleted.set(true);
}
- public boolean filtersMessages()
- {
- return _filters != null || _noLocal;
- }
-
public boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
@@ -510,13 +505,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- private String id = String.valueOf(System.identityHashCode(this));
-
- private String debugIdentity()
- {
- return id;
- }
-
private boolean checkFilters(QueueEntry msg)
{
return (_filters == null) || _filters.allAllow(msg);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index db378f2bf3..df648b88ae 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -98,7 +98,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private final AtomicBoolean _deleted = new AtomicBoolean(false);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 0aa0569b07..363eb5ecab 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -48,7 +48,7 @@ public class MockSubscription implements Subscription
private AMQShortString tag = new AMQShortString("mocktag");
private AMQQueue queue = null;
private StateListener _listener = null;
- private AMQQueue.Context _queueContext = null;
+ private volatile AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();