diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-05-28 03:18:52 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-05-28 03:18:52 +0000 |
| commit | 02d8c94d844fc05e329a501cc33296380963d19b (patch) | |
| tree | 95ce5e873806ade62d509fa6fcfbfd5c9cc68e32 /java/client/src/main | |
| parent | 920f7ed7c4a52b2544a13e40dd7952846cf993b3 (diff) | |
| download | qpid-python-02d8c94d844fc05e329a501cc33296380963d19b.tar.gz | |
1. Capacity can now be specified as " capacity : {source: 5, target 10}" in addition to "capacity:10" where both source and target is set to 10.
2. If the exchange type if direct and no subject is set, then the routing_key is set to "" instead of throwing an exception.
3. Added a fix to infer the exchange type if specified in the x-declares section.
4. The link can now specify an optional 'name' parameter which will be used as the queue name if present.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@949083 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
5 files changed, 56 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 593613f962..e1f29087a4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -139,7 +139,7 @@ public abstract class AMQDestination implements Destination, Referenceable protected Node _targetNode; protected Node _sourceNode; protected Link _targetLink; - protected Link _sourceLink; + protected Link _link; // ----- / Fields required to support new address syntax ------- static @@ -739,14 +739,14 @@ public abstract class AMQDestination implements Destination, Referenceable _sourceNode = node; } - public Link getSourceLink() + public Link getLink() { - return _sourceLink; + return _link; } - public void setSourceLink(Link link) + public void setLink(Link link) { - _sourceLink = link; + _link = link; } public void setExchangeName(AMQShortString name) @@ -792,7 +792,7 @@ public abstract class AMQDestination implements Destination, Referenceable _addressType = _addrHelper.getTargetNodeType(); _targetNode = _addrHelper.getTargetNode(_addressType); _sourceNode = _addrHelper.getSourceNode(_addressType); - _sourceLink = _addrHelper.getLink(); + _link = _addrHelper.getLink(); } // This method is needed if we didn't know the node type at the beginning. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 4aca7454bd..55cf5fe64b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -612,9 +612,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { long capacity = 0; if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getSourceLink().getCapacity() > 0) + destination.getLink().getConsumerCapacity() > 0) { - capacity = destination.getSourceLink().getCapacity(); + capacity = destination.getLink().getConsumerCapacity(); } else if (prefetch()) { @@ -1229,10 +1229,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY); dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString()); } - else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS) + else { - throw new AMQException("If sending to an exchange of type direct," + - " a valid subject must be specified"); + dest.setRoutingKey(new AMQShortString("")); + dest.setSubject(""); } } } @@ -1242,9 +1242,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null if (dest.getQueueName() == null || !isQueueExist(dest,node,true)) { - // can name : my-queue be used in x-declare? - // if so set it to dest queue name - // if (dest.getQueueName() == null) { dest.setName(node.getName()) } + if (dest.getLink() != null && dest.getLink().getName() != null) + { + dest.setQueueName(new AMQShortString(dest.getLink().getName())); + } send0_10QueueDeclare(dest,null,false,false); } node.addBinding(new Binding(dest.getAddressName(), diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 321f5855d7..9d597d8290 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -106,9 +106,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM // Destination setting overrides connection defaults if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getSourceLink().getCapacity() > 0) + destination.getLink().getConsumerCapacity() > 0) { - capacity = destination.getSourceLink().getCapacity(); + capacity = destination.getLink().getConsumerCapacity(); } else if (getSession().prefetch()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 60890e3ef7..dc7aca7d3e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -58,6 +58,8 @@ public class AddressHelper public static final String BINDINGS = "bindings"; public static final String BROWSE_ONLY = "browse"; public static final String CAPACITY = "capacity"; + public static final String CAPACITY_SOURCE = "source"; + public static final String CAPACITY_TARGET = "target"; public static final String NAME = "name"; public static final String EXCHANGE = "exchange"; public static final String QUEUE = "queue"; @@ -220,11 +222,7 @@ public class AddressHelper { return AMQDestination.QUEUE_TYPE; } - else if ((nodeProps.getString(TYPE).equals("topic") || - nodeProps.getString(TYPE).equals("direct") || - nodeProps.getString(TYPE).equals("fanout") || - nodeProps.getString(TYPE).equals("match") || - nodeProps.getString(TYPE).equals("xml")) ) + else if (nodeProps.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; } @@ -258,7 +256,8 @@ public class AddressHelper Map declareArgs = getDeclareArgs(parent); MapAccessor argsMap = new MapAccessor(declareArgs); ExchangeNode node = new ExchangeNode(); - node.setExchangeType(nodeProps.getString(TYPE)); + node.setExchangeType(argsMap.getString(TYPE) == null? + "topic":argsMap.getString(TYPE)); node.setDeclareArgs(getQpidExchangeOptions(argsMap)); fillInCommonNodeArgs(node,parent,argsMap); return node; @@ -285,6 +284,11 @@ public class AddressHelper node.setBindings(getBindings(parent)); } + /** + * if the type == queue x-declare args from the node props is used. + * if the type == exchange x-declare args from the link props is used + * else just create a default temp queue. + */ public Node getSourceNode(int addressType) { if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) @@ -309,7 +313,19 @@ public class AddressHelper { link.setDurable(linkProps.getBoolean(DURABLE)== null? false : linkProps.getBoolean(DURABLE)); link.setName(linkProps.getString(NAME)); - link.setCapacity(linkProps.getInt(CAPACITY)); + + if (((Map)address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + { + MapAccessor capacityProps = new MapAccessor( + (Map)((Map)address.getOptions().get(LINK)).get(CAPACITY)); + link.setConsumerCapacity(capacityProps.getInt(CAPACITY_SOURCE)); + link.setProducerCapacity(capacityProps.getInt(CAPACITY_TARGET)); + } + else + { + link.setConsumerCapacity(linkProps.getInt(CAPACITY)); + link.setProducerCapacity(linkProps.getInt(CAPACITY)); + } link.setFilter(linkProps.getString(FILTER)); // so far filter type not used } diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 367191e74e..0ebcaf548b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -31,7 +31,8 @@ public class Link protected FilterType _filterType = FilterType.SUBJECT; protected boolean _isNoLocal; protected boolean _isDurable; - protected int _capacity = 0; + protected int _consumerCapacity = 0; + protected int _producerCapacity = 0; protected Node node; public Node getNode() @@ -84,16 +85,26 @@ public class Link _isNoLocal = noLocal; } - public int getCapacity() + public int getConsumerCapacity() { - return _capacity; + return _consumerCapacity; } - public void setCapacity(int capacity) + public void setConsumerCapacity(int capacity) { - this._capacity = capacity; + _consumerCapacity = capacity; + } + + public int getProducerCapacity() + { + return _producerCapacity; } + public void setProducerCapacity(int capacity) + { + _producerCapacity = capacity; + } + public String getName() { return name; |
