summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java32
2 files changed, 20 insertions, 16 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 29c3007eaf..2e7f3eee7f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -64,6 +64,7 @@ import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -509,8 +510,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
{
if (_virtualHost.getState() != State.ACTIVE)
{
- _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
- return 0;
+ throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 8679e5c43a..12dd0b1e03 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -499,7 +499,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
try
{
- _asyncDelivery.execute(runnable);
+
+ if (_virtualHost.getState() != State.UNAVAILABLE)
+ {
+ _asyncDelivery.execute(runnable);
+ }
}
catch (RejectedExecutionException ree)
{
@@ -1923,12 +1927,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
sub.releaseSendLock();
}
}
-
- if (_virtualHost.getState() != State.ACTIVE)
- {
- _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
- return true;
- }
}
}
finally
@@ -1979,7 +1977,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueEntry node = getNextAvailableEntry(sub);
- if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
+
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new ConnectionScopedRuntimeException("Delivery halted owing to " +
+ "virtualhost state " + _virtualHost.getState());
+ }
+
+ if (node != null && node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
@@ -2185,12 +2190,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
sub.flushBatched();
break;
}
- if (_virtualHost.getState() != State.ACTIVE)
- {
- _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
-
- break;
- }
}
}
@@ -2544,6 +2543,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ }
+
if(!message.isReferenced(this))
{
txn.enqueue(this, message, new ServerTransaction.Action()