diff options
| author | Keith Wall <kwall@apache.org> | 2014-10-28 17:36:41 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-10-28 17:36:41 +0000 |
| commit | c1edd56bd54dbcb6cb9a1fb1cafe04549682faf0 (patch) | |
| tree | 2d2f862c49cac77f254d5e8d628f1f2ca2c8b956 /qpid/java | |
| parent | 84270a757d3d3c4e801a6f5e683a1f3a5fdcf0e3 (diff) | |
| download | qpid-python-c1edd56bd54dbcb6cb9a1fb1cafe04549682faf0.tar.gz | |
QPID-6192: [Java Broker] Incorporate some feedbackk from Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java | 4 | ||||
| -rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java | 32 |
2 files changed, 20 insertions, 16 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 29c3007eaf..2e7f3eee7f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -64,6 +64,7 @@ import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; @@ -509,8 +510,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> { if (_virtualHost.getState() != State.ACTIVE) { - _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); - return 0; + throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); } List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 8679e5c43a..12dd0b1e03 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -499,7 +499,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { try { - _asyncDelivery.execute(runnable); + + if (_virtualHost.getState() != State.UNAVAILABLE) + { + _asyncDelivery.execute(runnable); + } } catch (RejectedExecutionException ree) { @@ -1923,12 +1927,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> sub.releaseSendLock(); } } - - if (_virtualHost.getState() != State.ACTIVE) - { - _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState()); - return true; - } } } finally @@ -1979,7 +1977,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> QueueEntry node = getNextAvailableEntry(sub); - if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable()) + + if (_virtualHost.getState() != State.ACTIVE) + { + throw new ConnectionScopedRuntimeException("Delivery halted owing to " + + "virtualhost state " + _virtualHost.getState()); + } + + if (node != null && node.isAvailable()) { if (sub.hasInterest(node) && mightAssign(sub, node)) { @@ -2185,12 +2190,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> sub.flushBatched(); break; } - if (_virtualHost.getState() != State.ACTIVE) - { - _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState()); - - break; - } } } @@ -2544,6 +2543,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final ServerTransaction txn, final Action<? super MessageInstance> postEnqueueAction) { + if (_virtualHost.getState() != State.ACTIVE) + { + throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent"); + } + if(!message.isReferenced(this)) { txn.enqueue(this, message, new ServerTransaction.Action() |
