diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-12 20:25:53 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-12 20:25:53 +0000 |
| commit | 762aa71a99d13cb3f7efd29cb95098eadafb5396 (patch) | |
| tree | 86cf705813a21b0dde2393b799571fe48c2d0d46 /qpid/java/broker-core/src | |
| parent | e243745d439671418016a2be1570209269b45070 (diff) | |
| download | qpid-python-762aa71a99d13cb3f7efd29cb95098eadafb5396.tar.gz | |
merged from trunk r1659391
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
3 files changed, 16 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index 3b196df902..fd6f3385c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -39,7 +39,8 @@ public interface ConsumerImpl SEES_REQUEUES, TRANSIENT, EXCLUSIVE, - NO_LOCAL + NO_LOCAL, + DURABLE } long getBytesOut(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index d3ce911406..65e8a1358d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -246,6 +246,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); private final QueueRunner _queueRunner = new QueueRunner(this); + private boolean _closing; protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -754,6 +755,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } + @Override + protected void beforeClose() + { + _closing = true; + super.beforeClose(); + } + + + synchronized void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) @@ -794,7 +804,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if(!consumer.isTransient() && ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS ) - && getConsumerCount() == 0) + && getConsumerCount() == 0 + && !(consumer.isDurable() && _closing)) { if (_logger.isInfoEnabled()) @@ -1794,6 +1805,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } + _closing = false; } public void checkCapacity(AMQSessionModel channel) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 450d4d98d5..4ffb868537 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -153,7 +153,7 @@ class QueueConsumerImpl attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE)); attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL)); attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY"); - attributes.put(DURABLE,false); + attributes.put(DURABLE,optionSet.contains(Option.DURABLE)); attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END); if(filters != null) { |
