From d8fa6da3799f8dcf17aa224f46a7c840f0f884d4 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 12 Oct 2012 17:17:41 +0000 Subject: QPID-3317 Modified the code to implement correct behavior for link bindings. Added unit tests for Address Helper and two specific test cases for verifying link behavior (bindings and customization of subscription queues). Review request : https://reviews.apache.org/r/7412/ git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1397651 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQDestination.java | 49 +--- .../java/org/apache/qpid/client/AMQSession.java | 10 +- .../org/apache/qpid/client/AMQSession_0_10.java | 303 +++++++++++++++------ .../org/apache/qpid/client/AMQSession_0_8.java | 5 +- .../main/java/org/apache/qpid/client/AMQTopic.java | 4 +- .../apache/qpid/client/BasicMessageConsumer.java | 3 - .../qpid/client/BasicMessageConsumer_0_10.java | 11 +- .../qpid/client/BasicMessageConsumer_0_8.java | 5 - .../qpid/client/BasicMessageProducer_0_10.java | 34 ++- .../client/message/AMQMessageDelegate_0_10.java | 2 +- .../client/messaging/address/AddressHelper.java | 203 ++++++-------- .../apache/qpid/client/messaging/address/Link.java | 90 +++++- .../apache/qpid/client/messaging/address/Node.java | 85 +++--- .../messaging/address/AddressHelperTest.java | 126 +++++++++ 14 files changed, 589 insertions(+), 341 deletions(-) create mode 100644 java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 096738f9ad..f14b6d810b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -112,16 +112,6 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } - protected Link getTargetLink() - { - return _targetLink; - } - - protected void setTargetLink(Link targetLink) - { - _targetLink = targetLink; - } - // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -186,9 +176,7 @@ public abstract class AMQDestination implements Destination, Referenceable private AddressOption _assert = AddressOption.NEVER; private AddressOption _delete = AddressOption.NEVER; - private Node _targetNode; - private Node _sourceNode; - private Link _targetLink; + private Node _node; private Link _link; @@ -823,24 +811,14 @@ public abstract class AMQDestination implements Destination, Referenceable _delete = option; } - public Node getTargetNode() + public Node getNode() { - return _targetNode; + return _node; } - public void setTargetNode(Node node) + public void setNode(Node node) { - _targetNode = node; - } - - public Node getSourceNode() - { - return _sourceNode; - } - - public void setSourceNode(Node node) - { - _sourceNode = node; + _node = node; } public Link getLink() @@ -901,21 +879,11 @@ public abstract class AMQDestination implements Destination, Referenceable _browseOnly = _addrHelper.isBrowseOnly(); - _addressType = _addrHelper.getTargetNodeType(); - _targetNode = _addrHelper.getTargetNode(_addressType); - _sourceNode = _addrHelper.getSourceNode(_addressType); + _addressType = _addrHelper.getNodeType(); + _node = _addrHelper.getNode(); _link = _addrHelper.getLink(); } - // This method is needed if we didn't know the node type at the beginning. - // Therefore we have to query the broker to figure out the type. - // Once the type is known we look for the necessary properties. - public void rebuildTargetAndSourceNodes(int addressType) - { - _targetNode = _addrHelper.getTargetNode(addressType); - _sourceNode = _addrHelper.getSourceNode(addressType); - } - // ----- / new address syntax ----------- public boolean isBrowseOnly() @@ -950,8 +918,7 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setDelete(_delete); dest.setBrowseOnly(_browseOnly); dest.setAddressType(_addressType); - dest.setTargetNode(_targetNode); - dest.setSourceNode(_sourceNode); + dest.setNode(_node); dest.setLink(_link); dest.setAddressResolved(_addressResolved.get()); return dest; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 49964639e4..91a6389214 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -999,12 +999,11 @@ public abstract class AMQSession bindings = new ArrayList(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); + bindings.addAll(destination.getNode().getBindings()); String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; @@ -599,7 +619,17 @@ public class AMQSession_0_10 extends AMQSession arguments = new HashMap(); arguments.putAll((Map) node.getDeclareArgs()); if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) @@ -1065,11 +1092,12 @@ public class AMQSession_0_10 extends AMQSession arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + getQpidSession().queueDeclare(queueName, + queueProps.getAlternateExchange(), arguments, + queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + getQpidSession().exchangeBind(queueName, dest.getAddressName(), dest.getSubject(), Collections.emptyMap()); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); } - public void setLegacyFiledsForQueueType(AMQDestination dest) + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support dest.setQueueName(new AMQShortString(dest.getAddressName())); @@ -1342,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + getQpidSession().queueDeclare(dest.getAddressName(), + node.getAlternateExchange(), arguments, + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + sendExchangeDeclare(dest.getAddressName(), + node.getExchangeType(), + node.getAlternateExchange(), + node.getDeclareArgs(), + false, + node.isDurable(), + node.isAutoDelete()); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + void createBindings(AMQDestination dest, List bindings) + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeUnbind(queue, exchange, + binding.getBindingKey()); + } + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest, false)) + { + getQpidSession().queueDelete(dest.getQueueName()); + } + } + + void handleNodeDelete(AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + getQpidSession().exchangeDelete(dest.getAddressName()); + } + } + else + { + if (isQueueExist(dest,false)) + { + getQpidSession().queueDelete(dest.getAddressName()); + } + } + } +} \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index ccae5e31e5..3097b33da3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -695,10 +695,9 @@ public class AMQSession_0_8 extends AMQSession extends Closeable implements Messa { sendCancel(); } - cleanupQueue(); } } catch (AMQException e) @@ -631,8 +630,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; - - abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ca5b1ac9c1..902770d901 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -95,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer getBindings(Map props) { List bindings = new ArrayList(); - List bindingList = (List) props.get(X_BINDINGS); + List bindingList = (props == null) ? Collections.EMPTY_LIST : (List) props.get(X_BINDINGS); if (bindingList != null) { for (Map bindingMap : bindingList) @@ -157,117 +154,70 @@ public class AddressHelper } } - public int getTargetNodeType() throws Exception + public int getNodeType() throws Exception { - if (nodeProps == null || nodeProps.getString(TYPE) == null) + if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { // need to query and figure out return AMQDestination.UNKNOWN_TYPE; - } else if (nodeProps.getString(TYPE).equals("queue")) + } + else if (_nodePropAccess.getString(TYPE).equals("queue")) { return AMQDestination.QUEUE_TYPE; - } else if (nodeProps.getString(TYPE).equals("topic")) + } + else if (_nodePropAccess.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; - } else + } + else { throw new Exception("unkown exchange type"); } } - public Node getTargetNode(int addressType) + public Node getNode() { - // target node here is the default exchange - if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) - { - return new ExchangeNode(); - } else if (addressType == AMQDestination.TOPIC_TYPE) - { - Map node = (Map) address.getOptions().get(NODE); - return createExchangeNode(node); - } else + Node node = new Node(_address.getName()); + if (_nodePropAccess != null) { - // don't know yet - return null; - } - } - - private Node createExchangeNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - ExchangeNode node = new ExchangeNode(); - node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap - .getString(TYPE)); - fillInCommonNodeArgs(node, parent, argsMap); - return node; - } + Map xDeclareMap = getDeclareArgs(_nodePropMap); + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); - private Node createQueueNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - QueueNode node = new QueueNode(); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false - : argsMap.getBoolean(EXCLUSIVE)); - fillInCommonNodeArgs(node, parent, argsMap); - - return node; - } - - private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) - { - node.setDurable(getDurability(parent)); - node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false - : argsMap.getBoolean(AUTO_DELETE)); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setBindings(getBindings(parent)); - if (getDeclareArgs(parent).containsKey(ARGUMENTS)) - { - node.setDeclareArgs((Map)getDeclareArgs(parent).get(ARGUMENTS)); + node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false)); + node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false)); + node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false)); + node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + if (xDeclareMapAccessor.getString(TYPE) != null) + { + node.setExchangeType(xDeclareMapAccessor.getString(TYPE)); + } + node.setBindings(getBindings(_nodePropMap)); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + node.setDeclareArgs((Map)xDeclareMap.get(ARGUMENTS)); + } } - } - - private boolean getDurability(Map map) - { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + return node; } - /** - * if the type == queue x-declare args from the node props is used. if the - * type == exchange x-declare args from the link props is used else just - * create a default temp queue. - */ - public Node getSourceNode(int addressType) + // This should really be in the Accessor interface + private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue) { - if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) - { - return createQueueNode((Map) address.getOptions().get(NODE)); - } - if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) - { - return createQueueNode((Map) address.getOptions().get(LINK)); - } else - { - // need to query the info - return new QueueNode(); - } + Boolean result = access.getBoolean(propName); + return (result == null) ? defaultValue : result.booleanValue(); } public Link getLink() throws Exception { Link link = new Link(); link.setSubscription(new Subscription()); - if (linkProps != null) + link.setSubscriptionQueue(new SubscriptionQueue()); + if (_linkPropAccess != null) { - link.setDurable(linkProps.getBoolean(DURABLE) == null ? false - : linkProps.getBoolean(DURABLE)); - link.setName(linkProps.getString(NAME)); + link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false)); + link.setName(_linkPropAccess.getString(NAME)); - String reliability = linkProps.getString(RELIABILITY); + String reliability = _linkPropAccess.getString(RELIABILITY); if ( reliability != null) { if (reliability.equalsIgnoreCase("unreliable")) @@ -283,13 +233,12 @@ public class AddressHelper throw new Exception("The reliability mode '" + reliability + "' is not yet supported"); } - } - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) address.getOptions().get(LINK)) + (Map) ((Map) _address.getOptions().get(LINK)) .get(CAPACITY)); link .setConsumerCapacity(capacityProps @@ -302,17 +251,19 @@ public class AddressHelper } else { - int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps + int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess .getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } - link.setFilter(linkProps.getString(FILTER)); + link.setFilter(_linkPropAccess.getString(FILTER)); // so far filter type not used - if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + Map linkMap = (Map) _address.getOptions().get(LINK); + + if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) { - Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); if (x_subscribe.containsKey(ARGUMENTS)) { @@ -324,6 +275,18 @@ public class AddressHelper link.getSubscription().setExclusive(exclusive); } + + link.setBindings(getBindings(linkMap)); + Map xDeclareMap = getDeclareArgs(linkMap); + SubscriptionQueue queue = link.getSubscriptionQueue(); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true)); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); + queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + queue.setDeclareArgs((Map)xDeclareMap.get(ARGUMENTS)); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 41f6725c8f..40a84ebd02 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.client.AMQDestination.Binding; + public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } @@ -36,10 +41,11 @@ public class Link private boolean _isDurable; private int _consumerCapacity = 0; private int _producerCapacity = 0; - private Node node; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; - + private List _bindings = new ArrayList(); + private SubscriptionQueue _subscriptionQueue; + public Reliability getReliability() { return reliability; @@ -50,21 +56,11 @@ public class Link this.reliability = reliability; } - public Node getNode() - { - return node; - } - - public void setNode(Node node) - { - this.node = node; - } - public boolean isDurable() { return _isDurable; } - + public void setDurable(boolean durable) { _isDurable = durable; @@ -139,6 +135,74 @@ public class Link { this.subscription = subscription; } + + public List getBindings() + { + return _bindings; + } + + public void setBindings(List bindings) + { + _bindings = bindings; + } + + public SubscriptionQueue getSubscriptionQueue() + { + return _subscriptionQueue; + } + + public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue) + { + this._subscriptionQueue = subscriptionQueue; + } + + public static class SubscriptionQueue + { + private Map _declareArgs = new HashMap(); + private boolean _isAutoDelete = true; + private boolean _isExclusive = true; + private String _alternateExchange; + + public Map getDeclareArgs() + { + return _declareArgs; + } + + public void setDeclareArgs(Map options) + { + _declareArgs = options; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + } public static class Subscription { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index 0da0327885..005f98f344 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -public abstract class Node +public class Node { private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private String _name; private boolean _isDurable; private boolean _isAutoDelete; + private boolean _isExclusive; private String _alternateExchange; + private String _exchangeType = "topic"; // used when node is an exchange instead of a queue. private List _bindings = new ArrayList(); - private Map _declareArgs = Collections.emptyMap(); + private Map _declareArgs = new HashMap(); - protected Node(int nodeType) + protected Node(String name) + { + _name = name; + } + + public String getName() + { + return _name; + } + + public void setNodeType(int nodeType) { _nodeType = nodeType; } @@ -58,6 +72,16 @@ public abstract class Node _isDurable = durable; } + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + public boolean isAutoDelete() { return _isAutoDelete; @@ -100,56 +124,15 @@ public abstract class Node public void setDeclareArgs(Map options) { _declareArgs = options; - } - - public static class QueueNode extends Node - { - private boolean _isExclusive; - private QpidQueueOptions _queueOptions = new QpidQueueOptions(); - - public QueueNode() - { - super(AMQDestination.QUEUE_TYPE); - } - - public boolean isExclusive() - { - return _isExclusive; - } - - public void setExclusive(boolean exclusive) - { - _isExclusive = exclusive; - } } - - public static class ExchangeNode extends Node - { - private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - private String _exchangeType; - - public ExchangeNode() - { - super(AMQDestination.TOPIC_TYPE); - } - - public String getExchangeType() - { - return _exchangeType; - } - - public void setExchangeType(String exchangeType) - { - _exchangeType = exchangeType; - } - + + public void setExchangeType(String type) + { + _exchangeType = type; } - - public static class UnknownNodeType extends Node + + public String getExchangeType() { - public UnknownNodeType() - { - super(AMQDestination.UNKNOWN_TYPE); - } + return _exchangeType; } } diff --git a/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java new file mode 100644 index 0000000000..a602dcbfd4 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java @@ -0,0 +1,126 @@ +package org.apache.qpid.client.messaging.address; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AddressHelperTest extends QpidTestCase +{ + public void testAddressOptions() throws Exception + { + Address addr = Address.parse("queue/test;{create:sender, assert:always, delete:receiver, mode:browse}"); + AddressHelper helper = new AddressHelper(addr); + assertEquals(AddressOption.SENDER,AddressOption.getOption(helper.getCreate())); + assertEquals(AddressOption.ALWAYS,AddressOption.getOption(helper.getAssert())); + assertEquals(AddressOption.RECEIVER,AddressOption.getOption(helper.getDelete())); + assertTrue("'mode' option wasn't read properly",helper.isBrowseOnly()); + } + + public void testNodeProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "node: " + + "{" + + "type: queue ," + + "durable: true ," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]" + + + "}" + + "}"); + AddressHelper helper = new AddressHelper(addr); + Node node = helper.getNode(); + assertEquals("'type' property wasn't read properly",AMQDestination.QUEUE_TYPE,helper.getNodeType()); + assertTrue("'durable' property wasn't read properly",node.isDurable()); + assertTrue("'auto-delete' property wasn't read properly",node.isAutoDelete()); + assertTrue("'exclusive' property wasn't read properly",node.isExclusive()); + assertEquals("'alternate-exchange' property wasn't read properly","amq.fanout",node.getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,node.getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,node.getBindings().size()); + for (Binding binding: node.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + } + + public void testLinkProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "link: " + + "{" + + "name: my-queue ," + + "durable: true ," + + "reliability: at-least-once," + + "capacity: {source:10, target:15}," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]," + + "x-subscribes:{exclusive: true, arguments: {a:b,x:y}}" + + "}" + + "}"); + + AddressHelper helper = new AddressHelper(addr); + Link link = helper.getLink(); + assertEquals("'name' property wasn't read properly","my-queue",link.getName()); + assertTrue("'durable' property wasn't read properly",link.isDurable()); + assertEquals("'reliability' property wasn't read properly",Reliability.AT_LEAST_ONCE,link.getReliability()); + assertTrue("'auto-delete' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isAutoDelete()); + assertTrue("'exclusive' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isExclusive()); + assertEquals("'alternate-exchange' property in 'x-declare' wasn't read properly","amq.fanout",link.getSubscriptionQueue().getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,link.getSubscriptionQueue().getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,link.getBindings().size()); + for (Binding binding: link.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + assertTrue("'exclusive' property in 'x-subscribe' wasn't read properly",link.getSubscription().isExclusive()); + assertEquals("'arguments' in 'x-subscribe' property wasn't read properly",2,link.getSubscription().getArgs().size()); + } + +} -- cgit v1.2.1