From c5004408ae40f20ffe980114a6f482bc6e10342e Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 4 Nov 2010 22:49:52 +0000 Subject: QPID-2928 QPID-2737 The isExclusive method in BasicMessageConsumer_0_10.java takes the dest syntax into consideration when determining the exclusivity for a subscription. For destinations created using the addressing syntax, Queue subscriptions are non exclusive by default but are allowed to change it. Topic subscriptions are exclusive and is not allowed to change the value. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1031321 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 8 +++-- .../apache/qpid/client/BasicMessageConsumer.java | 4 +-- .../qpid/client/BasicMessageConsumer_0_10.java | 20 +++++++++++ .../client/messaging/address/AddressHelper.java | 20 ++++++++++- .../apache/qpid/client/messaging/address/Link.java | 40 ++++++++++++++++++++++ 5 files changed, 87 insertions(+), 5 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1d259eacce..bf04aa1c71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession arguments = FieldTable.convertToMap(consumer.getArguments()); + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession) consumer.getDestination().getLink().getSubscription().getArgs()); } - Map arguments = FieldTable.convertToMap(consumer.getArguments()); + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 862daec428..0a78403268 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private final boolean _exclusive; + protected boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per @@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - + _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; _noConsume = noConsume; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 35c0c66c7f..b5f3501e5a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -489,4 +489,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer)x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; + + link.getSubscription().setExclusive(exclusive); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 0ebcaf548b..a7d19d1bd5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.client.messaging.address.Node.QueueNode; public class Link @@ -34,6 +37,7 @@ public class Link protected int _consumerCapacity = 0; protected int _producerCapacity = 0; protected Node node; + protected Subscription subscription; public Node getNode() { @@ -114,4 +118,40 @@ public class Link { this.name = name; } + + public Subscription getSubscription() + { + return this.subscription; + } + + public void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + public static class Subscription + { + private Map args = new HashMap(); + private boolean exclusive = false; + + public Map getArgs() + { + return args; + } + + public void setArgs(Map args) + { + this.args = args; + } + + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + } } -- cgit v1.2.1