summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-10-28 17:36:41 +0000
committerKeith Wall <kwall@apache.org>2014-10-28 17:36:41 +0000
commitc1edd56bd54dbcb6cb9a1fb1cafe04549682faf0 (patch)
tree2d2f862c49cac77f254d5e8d628f1f2ca2c8b956 /qpid/java
parent84270a757d3d3c4e801a6f5e683a1f3a5fdcf0e3 (diff)
downloadqpid-python-c1edd56bd54dbcb6cb9a1fb1cafe04549682faf0.tar.gz
QPID-6192: [Java Broker] Incorporate some feedbackk from Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1634929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java32
2 files changed, 20 insertions, 16 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 29c3007eaf..2e7f3eee7f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -64,6 +64,7 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -509,8 +510,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
{
if (_virtualHost.getState() != State.ACTIVE)
{
- _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
- return 0;
+ throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 8679e5c43a..12dd0b1e03 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -499,7 +499,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
try
{
- _asyncDelivery.execute(runnable);
+
+ if (_virtualHost.getState() != State.UNAVAILABLE)
+ {
+ _asyncDelivery.execute(runnable);
+ }
}
catch (RejectedExecutionException ree)
{
@@ -1923,12 +1927,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
sub.releaseSendLock();
}
}
-
- if (_virtualHost.getState() != State.ACTIVE)
- {
- _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
- return true;
- }
}
}
finally
@@ -1979,7 +1977,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueEntry node = getNextAvailableEntry(sub);
- if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
+
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new ConnectionScopedRuntimeException("Delivery halted owing to " +
+ "virtualhost state " + _virtualHost.getState());
+ }
+
+ if (node != null && node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
@@ -2185,12 +2190,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
sub.flushBatched();
break;
}
- if (_virtualHost.getState() != State.ACTIVE)
- {
- _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
-
- break;
- }
}
}
@@ -2544,6 +2543,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ }
+
if(!message.isReferenced(this))
{
txn.enqueue(this, message, new ServerTransaction.Action()