diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-11-04 22:49:52 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-11-04 22:49:52 +0000 |
| commit | c5004408ae40f20ffe980114a6f482bc6e10342e (patch) | |
| tree | 4b94d3d45d11b59f99a4693c1deb76cbec925af4 /java/client | |
| parent | 29b35a61611a020e6e458c27b2eddc6160726d2c (diff) | |
| download | qpid-python-c5004408ae40f20ffe980114a6f482bc6e10342e.tar.gz | |
QPID-2928 QPID-2737 The isExclusive method in BasicMessageConsumer_0_10.java takes the dest syntax into consideration when determining the exclusivity for a subscription. For destinations created using the addressing syntax, Queue subscriptions are non exclusive by default but are allowed to change it. Topic subscriptions are exclusive and is not allowed to change the value.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1031321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
5 files changed, 87 insertions, 5 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1d259eacce..bf04aa1c71 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { boolean isTopic; - + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) { isTopic = consumer.getDestination() instanceof AMQTopic || @@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic preAcquire = !consumer.isNoConsume() && (isTopic || consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")); + + arguments.putAll( + (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); } - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + getQpidSession().messageSubscribe (queueName.toString(), String.valueOf(tag), getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 862daec428..0a78403268 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa /** * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private final boolean _exclusive; + protected boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per @@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - + _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; _noConsume = noConsume; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 35c0c66c7f..b5f3501e5a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -489,4 +489,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } + + public boolean isExclusive() + { + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 64d5b16db0..00503cc650 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Subscription; import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; @@ -264,6 +265,7 @@ public class AddressHelper public Link getLink() { Link link = new Link(); + link.setSubscription(new Subscription()); if (linkProps != null) { link.setDurable(linkProps.getBoolean(DURABLE) == null ? false @@ -283,7 +285,8 @@ public class AddressHelper .setProducerCapacity(capacityProps .getInt(CAPACITY_TARGET) == null ? 0 : capacityProps.getInt(CAPACITY_TARGET)); - } else + } + else { int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps .getInt(CAPACITY); @@ -292,6 +295,21 @@ public class AddressHelper } link.setFilter(linkProps.getString(FILTER)); // so far filter type not used + + if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + { + Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; + + link.getSubscription().setExclusive(exclusive); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 0ebcaf548b..a7d19d1bd5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.client.messaging.address.Node.QueueNode; public class Link @@ -34,6 +37,7 @@ public class Link protected int _consumerCapacity = 0; protected int _producerCapacity = 0; protected Node node; + protected Subscription subscription; public Node getNode() { @@ -114,4 +118,40 @@ public class Link { this.name = name; } + + public Subscription getSubscription() + { + return this.subscription; + } + + public void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + public static class Subscription + { + private Map<String,Object> args = new HashMap<String,Object>(); + private boolean exclusive = false; + + public Map<String, Object> getArgs() + { + return args; + } + + public void setArgs(Map<String, Object> args) + { + this.args = args; + } + + public boolean isExclusive() + { + return exclusive; + } + + public void setExclusive(boolean exclusive) + { + this.exclusive = exclusive; + } + } } |
