From 4ead64e1b150708fff3f54b3d5f06d82064c7d23 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 28 May 2010 03:18:52 +0000 Subject: 1. Capacity can now be specified as " capacity : {source: 5, target 10}" in addition to "capacity:10" where both source and target is set to 10. 2. If the exchange type if direct and no subject is set, then the routing_key is set to "" instead of throwing an exception. 3. Added a fix to infer the exchange type if specified in the x-declares section. 4. The link can now specify an optional 'name' parameter which will be used as the queue name if present. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@949083 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQDestination.java | 12 ++++----- .../org/apache/qpid/client/AMQSession_0_10.java | 17 ++++++------ .../qpid/client/BasicMessageConsumer_0_10.java | 4 +-- .../client/messaging/address/AddressHelper.java | 30 +++++++++++++++++----- .../apache/qpid/client/messaging/address/Link.java | 21 +++++++++++---- 5 files changed, 56 insertions(+), 28 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 593613f962..e1f29087a4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -139,7 +139,7 @@ public abstract class AMQDestination implements Destination, Referenceable protected Node _targetNode; protected Node _sourceNode; protected Link _targetLink; - protected Link _sourceLink; + protected Link _link; // ----- / Fields required to support new address syntax ------- static @@ -739,14 +739,14 @@ public abstract class AMQDestination implements Destination, Referenceable _sourceNode = node; } - public Link getSourceLink() + public Link getLink() { - return _sourceLink; + return _link; } - public void setSourceLink(Link link) + public void setLink(Link link) { - _sourceLink = link; + _link = link; } public void setExchangeName(AMQShortString name) @@ -792,7 +792,7 @@ public abstract class AMQDestination implements Destination, Referenceable _addressType = _addrHelper.getTargetNodeType(); _targetNode = _addrHelper.getTargetNode(_addressType); _sourceNode = _addrHelper.getSourceNode(_addressType); - _sourceLink = _addrHelper.getLink(); + _link = _addrHelper.getLink(); } // This method is needed if we didn't know the node type at the beginning. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 4aca7454bd..55cf5fe64b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -612,9 +612,9 @@ public class AMQSession_0_10 extends AMQSession 0) + destination.getLink().getConsumerCapacity() > 0) { - capacity = destination.getSourceLink().getCapacity(); + capacity = destination.getLink().getConsumerCapacity(); } else if (prefetch()) { @@ -1229,10 +1229,10 @@ public class AMQSession_0_10 extends AMQSession 0) + destination.getLink().getConsumerCapacity() > 0) { - capacity = destination.getSourceLink().getCapacity(); + capacity = destination.getLink().getConsumerCapacity(); } else if (getSession().prefetch()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 60890e3ef7..dc7aca7d3e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -58,6 +58,8 @@ public class AddressHelper public static final String BINDINGS = "bindings"; public static final String BROWSE_ONLY = "browse"; public static final String CAPACITY = "capacity"; + public static final String CAPACITY_SOURCE = "source"; + public static final String CAPACITY_TARGET = "target"; public static final String NAME = "name"; public static final String EXCHANGE = "exchange"; public static final String QUEUE = "queue"; @@ -220,11 +222,7 @@ public class AddressHelper { return AMQDestination.QUEUE_TYPE; } - else if ((nodeProps.getString(TYPE).equals("topic") || - nodeProps.getString(TYPE).equals("direct") || - nodeProps.getString(TYPE).equals("fanout") || - nodeProps.getString(TYPE).equals("match") || - nodeProps.getString(TYPE).equals("xml")) ) + else if (nodeProps.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; } @@ -258,7 +256,8 @@ public class AddressHelper Map declareArgs = getDeclareArgs(parent); MapAccessor argsMap = new MapAccessor(declareArgs); ExchangeNode node = new ExchangeNode(); - node.setExchangeType(nodeProps.getString(TYPE)); + node.setExchangeType(argsMap.getString(TYPE) == null? + "topic":argsMap.getString(TYPE)); node.setDeclareArgs(getQpidExchangeOptions(argsMap)); fillInCommonNodeArgs(node,parent,argsMap); return node; @@ -285,6 +284,11 @@ public class AddressHelper node.setBindings(getBindings(parent)); } + /** + * if the type == queue x-declare args from the node props is used. + * if the type == exchange x-declare args from the link props is used + * else just create a default temp queue. + */ public Node getSourceNode(int addressType) { if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) @@ -309,7 +313,19 @@ public class AddressHelper { link.setDurable(linkProps.getBoolean(DURABLE)== null? false : linkProps.getBoolean(DURABLE)); link.setName(linkProps.getString(NAME)); - link.setCapacity(linkProps.getInt(CAPACITY)); + + if (((Map)address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + { + MapAccessor capacityProps = new MapAccessor( + (Map)((Map)address.getOptions().get(LINK)).get(CAPACITY)); + link.setConsumerCapacity(capacityProps.getInt(CAPACITY_SOURCE)); + link.setProducerCapacity(capacityProps.getInt(CAPACITY_TARGET)); + } + else + { + link.setConsumerCapacity(linkProps.getInt(CAPACITY)); + link.setProducerCapacity(linkProps.getInt(CAPACITY)); + } link.setFilter(linkProps.getString(FILTER)); // so far filter type not used } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 367191e74e..0ebcaf548b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -31,7 +31,8 @@ public class Link protected FilterType _filterType = FilterType.SUBJECT; protected boolean _isNoLocal; protected boolean _isDurable; - protected int _capacity = 0; + protected int _consumerCapacity = 0; + protected int _producerCapacity = 0; protected Node node; public Node getNode() @@ -84,16 +85,26 @@ public class Link _isNoLocal = noLocal; } - public int getCapacity() + public int getConsumerCapacity() { - return _capacity; + return _consumerCapacity; } - public void setCapacity(int capacity) + public void setConsumerCapacity(int capacity) { - this._capacity = capacity; + _consumerCapacity = capacity; + } + + public int getProducerCapacity() + { + return _producerCapacity; } + public void setProducerCapacity(int capacity) + { + _producerCapacity = capacity; + } + public String getName() { return name; -- cgit v1.2.1