diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2014-07-08 23:18:46 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2014-07-08 23:18:46 +0000 |
| commit | 905d329f94390de37a6166f41e736943abee4c8a (patch) | |
| tree | 62aa8513057121cc102569215e601c030b09561d /qpid/java/client/src | |
| parent | 790ce3196c5cef5a138bae29fa7ac9f03fee5b1b (diff) | |
| download | qpid-python-905d329f94390de37a6166f41e736943abee4c8a.tar.gz | |
QPID-5870 A Consumer is now marked if it's using a durable subscription.
The topic subscription queue is now deleted when the subscription ends unless it's marked as a durable-topic-subscription.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1608971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
4 files changed, 35 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2ca93a567e..a37b532617 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1105,6 +1105,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { C consumer = (C) createConsumer(dest, messageSelector, noLocal); + consumer.markAsDurableSubscriber(); subscriber = new TopicSubscriberAdaptor<C>(dest, consumer); // Save subscription information diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 60d48f02ef..cb8f81f68f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1143,26 +1143,35 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { + Node node = dest.getNode(); + return isQueueExist(dest.getAddressName(), assertNode, + node.isDurable(), node.isAutoDelete(), + node.isExclusive(), node.getDeclareArgs()); + } + + public boolean isQueueExist(String queueName, boolean assertNode, + boolean durable, boolean autoDelete, + boolean exclusive, Map<String, Object> args) throws AMQException + { boolean match = true; try { - QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); - match = dest.getAddressName().equals(result.getQueue()); - Node node = dest.getNode(); + QueueQueryResult result = getQpidSession().queueQuery(queueName, Option.NONE).get(); + match = queueName.equals(result.getQueue()); if (match && assertNode) { - match = (result.getDurable() == node.isDurable()) && - (result.getAutoDelete() == node.isAutoDelete()) && - (result.getExclusive() == node.isExclusive()) && - (matchProps(result.getArguments(),node.getDeclareArgs())); + match = (result.getDurable() == durable) && + (result.getAutoDelete() == autoDelete) && + (result.getExclusive() == exclusive) && + (matchProps(result.getArguments(),args)); } if (assertNode) { if (!match) { - throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + throw new AMQException("Assert failed for queue : " + queueName +", Result was : " + result); } } } @@ -1596,7 +1605,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // We need to delete the subscription queue. if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && dest.getLink().getSubscriptionQueue().isExclusive() && - isQueueExist(dest, false)) + isQueueExist(dest.getQueueName(), false, false, false, false, null)) { getQpidSession().queueDelete(dest.getQueueName()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index b5e008da5a..5086063a5a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -134,6 +134,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa private final boolean _browseOnly; private List<StackTraceElement> _closedStack = null; + private boolean _isDurableSubscriber = false; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -1035,4 +1036,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { return _messageFactory; } + + protected boolean isDurableSubscriber() + { + return _isDurableSubscriber; + } + + protected void markAsDurableSubscriber() + { + _isDurableSubscriber = true; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ef7b8cc217..658fb25ce4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -509,11 +509,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).handleNodeDelete(dest); - ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); + ((AMQSession_0_10) getSession()).handleNodeDelete(dest); } // Subscription queue is handled as part of linkDelete method. ((AMQSession_0_10) getSession()).handleLinkDelete(dest); + if (!isDurableSubscriber()) + { + ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); + } } } |
