summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2014-07-08 23:18:46 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2014-07-08 23:18:46 +0000
commit905d329f94390de37a6166f41e736943abee4c8a (patch)
tree62aa8513057121cc102569215e601c030b09561d /qpid/java/client/src
parent790ce3196c5cef5a138bae29fa7ac9f03fee5b1b (diff)
downloadqpid-python-905d329f94390de37a6166f41e736943abee4c8a.tar.gz
QPID-5870 A Consumer is now marked if it's using a durable subscription.
The topic subscription queue is now deleted when the subscription ends unless it's marked as a durable-topic-subscription. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1608971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java7
4 files changed, 35 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 2ca93a567e..a37b532617 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1105,6 +1105,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ consumer.markAsDurableSubscriber();
subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);
// Save subscription information
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 60d48f02ef..cb8f81f68f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1143,26 +1143,35 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
+ Node node = dest.getNode();
+ return isQueueExist(dest.getAddressName(), assertNode,
+ node.isDurable(), node.isAutoDelete(),
+ node.isExclusive(), node.getDeclareArgs());
+ }
+
+ public boolean isQueueExist(String queueName, boolean assertNode,
+ boolean durable, boolean autoDelete,
+ boolean exclusive, Map<String, Object> args) throws AMQException
+ {
boolean match = true;
try
{
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
- Node node = dest.getNode();
+ QueueQueryResult result = getQpidSession().queueQuery(queueName, Option.NONE).get();
+ match = queueName.equals(result.getQueue());
if (match && assertNode)
{
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
+ match = (result.getDurable() == durable) &&
+ (result.getAutoDelete() == autoDelete) &&
+ (result.getExclusive() == exclusive) &&
+ (matchProps(result.getArguments(),args));
}
if (assertNode)
{
if (!match)
{
- throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ throw new AMQException("Assert failed for queue : " + queueName +", Result was : " + result);
}
}
}
@@ -1596,7 +1605,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// We need to delete the subscription queue.
if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
dest.getLink().getSubscriptionQueue().isExclusive() &&
- isQueueExist(dest, false))
+ isQueueExist(dest.getQueueName(), false, false, false, false, null))
{
getQpidSession().queueDelete(dest.getQueueName());
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index b5e008da5a..5086063a5a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -134,6 +134,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
private final boolean _browseOnly;
private List<StackTraceElement> _closedStack = null;
+ private boolean _isDurableSubscriber = false;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -1035,4 +1036,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
return _messageFactory;
}
+
+ protected boolean isDurableSubscriber()
+ {
+ return _isDurableSubscriber;
+ }
+
+ protected void markAsDurableSubscriber()
+ {
+ _isDurableSubscriber = true;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ef7b8cc217..658fb25ce4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -509,11 +509,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
- ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
+ ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+ if (!isDurableSubscriber())
+ {
+ ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
+ }
}
}