summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java2
3 files changed, 16 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index 3b196df902..fd6f3385c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -39,7 +39,8 @@ public interface ConsumerImpl
SEES_REQUEUES,
TRANSIENT,
EXCLUSIVE,
- NO_LOCAL
+ NO_LOCAL,
+ DURABLE
}
long getBytesOut();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index d3ce911406..65e8a1358d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -246,6 +246,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final QueueRunner _queueRunner = new QueueRunner(this);
+ private boolean _closing;
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -754,6 +755,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ @Override
+ protected void beforeClose()
+ {
+ _closing = true;
+ super.beforeClose();
+ }
+
+
+
synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
@@ -794,7 +804,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(!consumer.isTransient()
&& ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
|| getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
- && getConsumerCount() == 0)
+ && getConsumerCount() == 0
+ && !(consumer.isDurable() && _closing))
{
if (_logger.isInfoEnabled())
@@ -1794,6 +1805,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
+ _closing = false;
}
public void checkCapacity(AMQSessionModel channel)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 450d4d98d5..4ffb868537 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -153,7 +153,7 @@ class QueueConsumerImpl
attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
- attributes.put(DURABLE,false);
+ attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
if(filters != null)
{