summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-12 20:25:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-12 20:25:53 +0000
commit762aa71a99d13cb3f7efd29cb95098eadafb5396 (patch)
tree86cf705813a21b0dde2393b799571fe48c2d0d46 /qpid/java/broker-core/src
parente243745d439671418016a2be1570209269b45070 (diff)
downloadqpid-python-762aa71a99d13cb3f7efd29cb95098eadafb5396.tar.gz
merged from trunk r1659391
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java2
3 files changed, 16 insertions, 3 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index 3b196df902..fd6f3385c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -39,7 +39,8 @@ public interface ConsumerImpl
SEES_REQUEUES,
TRANSIENT,
EXCLUSIVE,
- NO_LOCAL
+ NO_LOCAL,
+ DURABLE
}
long getBytesOut();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index d3ce911406..65e8a1358d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -246,6 +246,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final QueueRunner _queueRunner = new QueueRunner(this);
+ private boolean _closing;
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -754,6 +755,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ @Override
+ protected void beforeClose()
+ {
+ _closing = true;
+ super.beforeClose();
+ }
+
+
+
synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
@@ -794,7 +804,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(!consumer.isTransient()
&& ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
|| getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
- && getConsumerCount() == 0)
+ && getConsumerCount() == 0
+ && !(consumer.isDurable() && _closing))
{
if (_logger.isInfoEnabled())
@@ -1794,6 +1805,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
+ _closing = false;
}
public void checkCapacity(AMQSessionModel channel)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 450d4d98d5..4ffb868537 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -153,7 +153,7 @@ class QueueConsumerImpl
attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
- attributes.put(DURABLE,false);
+ attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
if(filters != null)
{