diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-10-12 17:17:41 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-10-12 17:17:41 +0000 |
| commit | d8fa6da3799f8dcf17aa224f46a7c840f0f884d4 (patch) | |
| tree | 72de43d56fb5f0879993548ca5f52060c82c9000 /java/client | |
| parent | d612f1f5094e966455cf7705588a50205a1f3fb6 (diff) | |
| download | qpid-python-d8fa6da3799f8dcf17aa224f46a7c840f0f884d4.tar.gz | |
QPID-3317 Modified the code to implement correct behavior for link
bindings. Added unit tests for Address Helper and two specific test
cases for verifying link behavior (bindings and customization of
subscription queues).
Review request : https://reviews.apache.org/r/7412/
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1397651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
14 files changed, 589 insertions, 341 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 096738f9ad..f14b6d810b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -112,16 +112,6 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } - protected Link getTargetLink() - { - return _targetLink; - } - - protected void setTargetLink(Link targetLink) - { - _targetLink = targetLink; - } - // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -186,9 +176,7 @@ public abstract class AMQDestination implements Destination, Referenceable private AddressOption _assert = AddressOption.NEVER; private AddressOption _delete = AddressOption.NEVER; - private Node _targetNode; - private Node _sourceNode; - private Link _targetLink; + private Node _node; private Link _link; @@ -823,24 +811,14 @@ public abstract class AMQDestination implements Destination, Referenceable _delete = option; } - public Node getTargetNode() + public Node getNode() { - return _targetNode; + return _node; } - public void setTargetNode(Node node) + public void setNode(Node node) { - _targetNode = node; - } - - public Node getSourceNode() - { - return _sourceNode; - } - - public void setSourceNode(Node node) - { - _sourceNode = node; + _node = node; } public Link getLink() @@ -901,21 +879,11 @@ public abstract class AMQDestination implements Destination, Referenceable _browseOnly = _addrHelper.isBrowseOnly(); - _addressType = _addrHelper.getTargetNodeType(); - _targetNode = _addrHelper.getTargetNode(_addressType); - _sourceNode = _addrHelper.getSourceNode(_addressType); + _addressType = _addrHelper.getNodeType(); + _node = _addrHelper.getNode(); _link = _addrHelper.getLink(); } - // This method is needed if we didn't know the node type at the beginning. - // Therefore we have to query the broker to figure out the type. - // Once the type is known we look for the necessary properties. - public void rebuildTargetAndSourceNodes(int addressType) - { - _targetNode = _addrHelper.getTargetNode(addressType); - _sourceNode = _addrHelper.getSourceNode(addressType); - } - // ----- / new address syntax ----------- public boolean isBrowseOnly() @@ -950,8 +918,7 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setDelete(_delete); dest.setBrowseOnly(_browseOnly); dest.setAddressType(_addressType); - dest.setTargetNode(_targetNode); - dest.setSourceNode(_sourceNode); + dest.setNode(_node); dest.setLink(_link); dest.setAddressResolved(_addressResolved.get()); return dest; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 49964639e4..91a6389214 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -999,12 +999,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { try { - handleAddressBasedDestination(dest,false,noLocal,true); + resolveAddress(dest,false,noLocal); if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) { throw new JMSException("Durable subscribers can only be created for Topics"); } - dest.getSourceNode().setDurable(true); } catch(AMQException e) { @@ -2840,7 +2839,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (amqd.getDestSyntax() == DestSyntax.ADDR) { - handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); + resolveAddress(amqd,true,consumer.isNoLocal()); } else { @@ -2899,10 +2898,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - public abstract void handleAddressBasedDestination(AMQDestination dest, + public abstract void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException; + boolean noLocal) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e5f3e33d15..e271436c21 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,6 +17,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; @@ -29,8 +34,10 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; + import javax.jms.Destination; import javax.jms.JMSException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -44,18 +51,31 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; - -import static org.apache.qpid.transport.Option.BATCH; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -347,9 +367,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. List<Binding> bindings = new ArrayList<Binding>(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); + bindings.addAll(destination.getNode().getBindings()); String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; @@ -599,7 +619,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException - { + { + if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal()); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -642,11 +672,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic capacity, Option.UNRELIABLE); } - - if (!nowait) - { - sync(); - } + sync(); } /** @@ -753,7 +779,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - QueueNode node = (QueueNode)amqd.getSourceNode(); + // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java + Node node = amqd.getNode(); Map<String,Object> arguments = new HashMap<String,Object>(); arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) @@ -1065,11 +1092,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } - public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); match = !result.getNotFound(); + Node node = dest.getNode(); if (match) { @@ -1079,16 +1107,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) && (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (node.getExchangeType() != null) - { - // even if assert is false, better to verify this - match = node.getExchangeType().equals(result.getType()); - if (!match) - { - _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + - " actual " + result.getType()); - } } else { @@ -1097,18 +1115,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setExchangeClass(new AMQShortString(result.getType())); } } - return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { boolean match = true; try { QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); match = dest.getAddressName().equals(result.getQueue()); - + Node node = dest.getNode(); + if (match && assertNode) { match = (result.getDurable() == node.isDurable()) && @@ -1133,7 +1151,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "Error querying queue",e); } } - return match; } @@ -1172,17 +1189,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ @SuppressWarnings("deprecation") - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { - if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) - { - createSubscriptionQueue(dest,noLocal); - } + return; } else { @@ -1202,50 +1215,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { case AMQDestination.QUEUE_TYPE: { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) + if (isQueueExist(dest,assertNode)) { - setLegacyFiledsForQueueType(dest); + setLegacyFieldsForQueueType(dest); break; } else if(createNode) { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,noLocal,noWait,false); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); break; } } case AMQDestination.TOPIC_TYPE: { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) + if (isExchangeExist(dest,assertNode)) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest, noLocal); - } break; } else if(createNode) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false, - dest.getTargetNode().isDurable(), - dest.getTargetNode().isAutoDelete()); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest,noLocal); - } - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + handleExchangeNodeCreation(dest); break; } } @@ -1284,7 +1279,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw new AMQException("Ambiguous address, please specify queue or topic as node type"); } dest.setAddressType(type); - dest.rebuildTargetAndSourceNodes(type); return type; } } @@ -1307,29 +1301,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException + void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException { - QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - - if (dest.getQueueName() == null) + Link link = dest.getLink(); + String queueName = dest.getQueueName(); + + if (queueName == null) { - if (dest.getLink() != null && dest.getLink().getName() != null) - { - dest.setQueueName(new AMQShortString(dest.getLink().getName())); - } + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); } - node.setExclusive(true); - node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,noLocal,true,false); - getQpidSession().exchangeBind(dest.getQueueName(), + + SubscriptionQueue queueProps = link.getSubscriptionQueue(); + Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + getQpidSession().queueDeclare(queueName, + queueProps.getAlternateExchange(), arguments, + queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + getQpidSession().exchangeBind(queueName, dest.getAddressName(), dest.getSubject(), Collections.<String,Object>emptyMap()); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); } - public void setLegacyFiledsForQueueType(AMQDestination dest) + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support dest.setQueueName(new AMQShortString(dest.getAddressName())); @@ -1342,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // legacy support dest.setExchangeName(new AMQShortString(dest.getAddressName())); - ExchangeNode node = (ExchangeNode)dest.getTargetNode(); + Node node = dest.getNode(); dest.setExchangeClass(node.getExchangeType() == null? ExchangeDefaults.TOPIC_EXCHANGE_CLASS: new AMQShortString(node.getExchangeType())); @@ -1436,5 +1438,144 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } } -} + private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + { + Node node = dest.getNode(); + Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + getQpidSession().queueDeclare(dest.getAddressName(), + node.getAlternateExchange(), arguments, + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + sendExchangeDeclare(dest.getAddressName(), + node.getExchangeType(), + node.getAlternateExchange(), + node.getDeclareArgs(), + false, + node.isDurable(), + node.isAutoDelete()); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + void createBindings(AMQDestination dest, List<Binding> bindings) + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeUnbind(queue, exchange, + binding.getBindingKey()); + } + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest, false)) + { + getQpidSession().queueDelete(dest.getQueueName()); + } + } + + void handleNodeDelete(AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + getQpidSession().exchangeDelete(dest.getAddressName()); + } + } + else + { + if (isQueueExist(dest,false)) + { + getQpidSession().queueDelete(dest.getAddressName()); + } + } + } +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index ccae5e31e5..3097b33da3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -695,10 +695,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index f09ef5e01d..51b6c7e478 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -114,8 +114,8 @@ public class AMQTopic extends AMQDestination implements Topic AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); // link is never null if dest was created using an address string. t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); + t.getLink().getSubscriptionQueue().setAutoDelete(false); + t.getLink().setDurable(true); // The legacy fields are also populated just in case. t.setQueueName(queueName); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f8e837cd34..b5e008da5a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -593,7 +593,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { sendCancel(); } - cleanupQueue(); } } catch (AMQException e) @@ -631,8 +630,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; - - abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ca5b1ac9c1..902770d901 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -95,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _capacity = evaluateCapacity(destination); + // This is due to the Destination carrying the temporary subscription name which is incorrect. if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; @@ -163,6 +164,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM @Override void sendCancel() throws AMQException { _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + postSubscription(); try { _0_10session.getQpidSession().sync(); @@ -499,7 +501,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - void cleanupQueue() throws AMQException, FailoverException + void postSubscription() throws AMQException { AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) @@ -507,9 +509,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); + ((AMQSession_0_10) getSession()).handleNodeDelete(dest); } + // Subscription queue is handled as part of linkDelete method. + ((AMQSession_0_10) getSession()).handleLinkDelete(dest); } } @@ -559,4 +562,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -} +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index fc7eacc760..f733e6bbca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -127,11 +127,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return receive(); } - void cleanupQueue() throws AMQException, FailoverException - { - - } - public RejectBehaviour getRejectBehaviour() { return _rejectBehaviour; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 3b4f642d4c..f717ca4655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -86,7 +86,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { try { - getSession().handleAddressBasedDestination(destination,false,false,false); + getSession().resolveAddress(destination,false,false); + ((AMQSession_0_10)getSession()).handleLinkCreation(destination); + ((AMQSession_0_10)getSession()).sync(); } catch(Exception e) { @@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer return getSession().isQueueBound(destination); } + // We should have a close and closed method to distinguish between normal close + // and a close due to session or connection error. @Override public void close() throws JMSException { super.close(); AMQDestination dest = getAMQDestination(); - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) + try { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - getAMQDestination().getQueueName()); - } - catch(TransportException e) + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.SENDER ) { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + ssn.handleNodeDelete(dest); } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index a0c3914127..ee39ecd7fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -344,7 +344,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd); + ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 318fe32d36..9b291b48f7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -20,21 +20,20 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; import org.apache.qpid.messaging.Address; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - /** * Utility class for extracting information from the address class */ @@ -68,58 +67,56 @@ public class AddressHelper public static final String ARGUMENTS = "arguments"; public static final String RELIABILITY = "reliability"; - private Address address; - private Accessor addressProps; - private Accessor nodeProps; - private Accessor linkProps; + private Address _address; + private Accessor _addressPropAccess; + private Accessor _nodePropAccess; + private Accessor _linkPropAccess; + private Map _addressPropMap; + private Map _nodePropMap; + private Map _linkPropMap; public AddressHelper(Address address) { - this.address = address; - addressProps = new MapAccessor(address.getOptions()); - Map node_props = address.getOptions() == null + this._address = address; + this._addressPropMap = address.getOptions(); + this._addressPropAccess = new MapAccessor(_addressPropMap); + this._nodePropMap = address.getOptions() == null || address.getOptions().get(NODE) == null ? null : (Map) address.getOptions().get(NODE); - if (node_props != null) + if (_nodePropMap != null) { - nodeProps = new MapAccessor(node_props); + _nodePropAccess = new MapAccessor(_nodePropMap); } - Map link_props = address.getOptions() == null + this._linkPropMap = address.getOptions() == null || address.getOptions().get(LINK) == null ? null : (Map) address.getOptions().get(LINK); - if (link_props != null) + if (_linkPropMap != null) { - linkProps = new MapAccessor(link_props); + _linkPropAccess = new MapAccessor(_linkPropMap); } } public String getCreate() { - return addressProps.getString(CREATE); + return _addressPropAccess.getString(CREATE); } public String getAssert() { - return addressProps.getString(ASSERT); + return _addressPropAccess.getString(ASSERT); } public String getDelete() { - return addressProps.getString(DELETE); - } - - public boolean isNoLocal() - { - Boolean b = nodeProps.getBoolean(NO_LOCAL); - return b == null ? false : b; + return _addressPropAccess.getString(DELETE); } public boolean isBrowseOnly() { - String mode = addressProps.getString(MODE); + String mode = _addressPropAccess.getString(MODE); return mode != null && mode.equals(BROWSE) ? true : false; } @@ -127,7 +124,7 @@ public class AddressHelper public List<Binding> getBindings(Map props) { List<Binding> bindings = new ArrayList<Binding>(); - List<Map> bindingList = (List<Map>) props.get(X_BINDINGS); + List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS); if (bindingList != null) { for (Map bindingMap : bindingList) @@ -157,117 +154,70 @@ public class AddressHelper } } - public int getTargetNodeType() throws Exception + public int getNodeType() throws Exception { - if (nodeProps == null || nodeProps.getString(TYPE) == null) + if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { // need to query and figure out return AMQDestination.UNKNOWN_TYPE; - } else if (nodeProps.getString(TYPE).equals("queue")) + } + else if (_nodePropAccess.getString(TYPE).equals("queue")) { return AMQDestination.QUEUE_TYPE; - } else if (nodeProps.getString(TYPE).equals("topic")) + } + else if (_nodePropAccess.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; - } else + } + else { throw new Exception("unkown exchange type"); } } - public Node getTargetNode(int addressType) + public Node getNode() { - // target node here is the default exchange - if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) - { - return new ExchangeNode(); - } else if (addressType == AMQDestination.TOPIC_TYPE) - { - Map node = (Map) address.getOptions().get(NODE); - return createExchangeNode(node); - } else + Node node = new Node(_address.getName()); + if (_nodePropAccess != null) { - // don't know yet - return null; - } - } - - private Node createExchangeNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - ExchangeNode node = new ExchangeNode(); - node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap - .getString(TYPE)); - fillInCommonNodeArgs(node, parent, argsMap); - return node; - } + Map xDeclareMap = getDeclareArgs(_nodePropMap); + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); - private Node createQueueNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - QueueNode node = new QueueNode(); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false - : argsMap.getBoolean(EXCLUSIVE)); - fillInCommonNodeArgs(node, parent, argsMap); - - return node; - } - - private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) - { - node.setDurable(getDurability(parent)); - node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false - : argsMap.getBoolean(AUTO_DELETE)); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setBindings(getBindings(parent)); - if (getDeclareArgs(parent).containsKey(ARGUMENTS)) - { - node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS)); + node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false)); + node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false)); + node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false)); + node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + if (xDeclareMapAccessor.getString(TYPE) != null) + { + node.setExchangeType(xDeclareMapAccessor.getString(TYPE)); + } + node.setBindings(getBindings(_nodePropMap)); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } - } - - private boolean getDurability(Map map) - { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + return node; } - /** - * if the type == queue x-declare args from the node props is used. if the - * type == exchange x-declare args from the link props is used else just - * create a default temp queue. - */ - public Node getSourceNode(int addressType) + // This should really be in the Accessor interface + private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue) { - if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) - { - return createQueueNode((Map) address.getOptions().get(NODE)); - } - if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) - { - return createQueueNode((Map) address.getOptions().get(LINK)); - } else - { - // need to query the info - return new QueueNode(); - } + Boolean result = access.getBoolean(propName); + return (result == null) ? defaultValue : result.booleanValue(); } public Link getLink() throws Exception { Link link = new Link(); link.setSubscription(new Subscription()); - if (linkProps != null) + link.setSubscriptionQueue(new SubscriptionQueue()); + if (_linkPropAccess != null) { - link.setDurable(linkProps.getBoolean(DURABLE) == null ? false - : linkProps.getBoolean(DURABLE)); - link.setName(linkProps.getString(NAME)); + link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false)); + link.setName(_linkPropAccess.getString(NAME)); - String reliability = linkProps.getString(RELIABILITY); + String reliability = _linkPropAccess.getString(RELIABILITY); if ( reliability != null) { if (reliability.equalsIgnoreCase("unreliable")) @@ -283,13 +233,12 @@ public class AddressHelper throw new Exception("The reliability mode '" + reliability + "' is not yet supported"); } - } - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) address.getOptions().get(LINK)) + (Map) ((Map) _address.getOptions().get(LINK)) .get(CAPACITY)); link .setConsumerCapacity(capacityProps @@ -302,17 +251,19 @@ public class AddressHelper } else { - int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps + int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess .getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } - link.setFilter(linkProps.getString(FILTER)); + link.setFilter(_linkPropAccess.getString(FILTER)); // so far filter type not used - if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + Map linkMap = (Map) _address.getOptions().get(LINK); + + if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) { - Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); if (x_subscribe.containsKey(ARGUMENTS)) { @@ -324,6 +275,18 @@ public class AddressHelper link.getSubscription().setExclusive(exclusive); } + + link.setBindings(getBindings(linkMap)); + Map xDeclareMap = getDeclareArgs(linkMap); + SubscriptionQueue queue = link.getSubscriptionQueue(); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true)); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); + queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 41f6725c8f..40a84ebd02 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.client.AMQDestination.Binding; + public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } @@ -36,10 +41,11 @@ public class Link private boolean _isDurable; private int _consumerCapacity = 0; private int _producerCapacity = 0; - private Node node; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; - + private List<Binding> _bindings = new ArrayList<Binding>(); + private SubscriptionQueue _subscriptionQueue; + public Reliability getReliability() { return reliability; @@ -50,21 +56,11 @@ public class Link this.reliability = reliability; } - public Node getNode() - { - return node; - } - - public void setNode(Node node) - { - this.node = node; - } - public boolean isDurable() { return _isDurable; } - + public void setDurable(boolean durable) { _isDurable = durable; @@ -139,6 +135,74 @@ public class Link { this.subscription = subscription; } + + public List<Binding> getBindings() + { + return _bindings; + } + + public void setBindings(List<Binding> bindings) + { + _bindings = bindings; + } + + public SubscriptionQueue getSubscriptionQueue() + { + return _subscriptionQueue; + } + + public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue) + { + this._subscriptionQueue = subscriptionQueue; + } + + public static class SubscriptionQueue + { + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); + private boolean _isAutoDelete = true; + private boolean _isExclusive = true; + private String _alternateExchange; + + public Map<String,Object> getDeclareArgs() + { + return _declareArgs; + } + + public void setDeclareArgs(Map<String,Object> options) + { + _declareArgs = options; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + } public static class Subscription { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index 0da0327885..005f98f344 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -public abstract class Node +public class Node { private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private String _name; private boolean _isDurable; private boolean _isAutoDelete; + private boolean _isExclusive; private String _alternateExchange; + private String _exchangeType = "topic"; // used when node is an exchange instead of a queue. private List<Binding> _bindings = new ArrayList<Binding>(); - private Map<String,Object> _declareArgs = Collections.emptyMap(); + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); - protected Node(int nodeType) + protected Node(String name) + { + _name = name; + } + + public String getName() + { + return _name; + } + + public void setNodeType(int nodeType) { _nodeType = nodeType; } @@ -58,6 +72,16 @@ public abstract class Node _isDurable = durable; } + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + public boolean isAutoDelete() { return _isAutoDelete; @@ -100,56 +124,15 @@ public abstract class Node public void setDeclareArgs(Map<String,Object> options) { _declareArgs = options; - } - - public static class QueueNode extends Node - { - private boolean _isExclusive; - private QpidQueueOptions _queueOptions = new QpidQueueOptions(); - - public QueueNode() - { - super(AMQDestination.QUEUE_TYPE); - } - - public boolean isExclusive() - { - return _isExclusive; - } - - public void setExclusive(boolean exclusive) - { - _isExclusive = exclusive; - } } - - public static class ExchangeNode extends Node - { - private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - private String _exchangeType; - - public ExchangeNode() - { - super(AMQDestination.TOPIC_TYPE); - } - - public String getExchangeType() - { - return _exchangeType; - } - - public void setExchangeType(String exchangeType) - { - _exchangeType = exchangeType; - } - + + public void setExchangeType(String type) + { + _exchangeType = type; } - - public static class UnknownNodeType extends Node + + public String getExchangeType() { - public UnknownNodeType() - { - super(AMQDestination.UNKNOWN_TYPE); - } + return _exchangeType; } } diff --git a/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java new file mode 100644 index 0000000000..a602dcbfd4 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java @@ -0,0 +1,126 @@ +package org.apache.qpid.client.messaging.address; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AddressHelperTest extends QpidTestCase +{ + public void testAddressOptions() throws Exception + { + Address addr = Address.parse("queue/test;{create:sender, assert:always, delete:receiver, mode:browse}"); + AddressHelper helper = new AddressHelper(addr); + assertEquals(AddressOption.SENDER,AddressOption.getOption(helper.getCreate())); + assertEquals(AddressOption.ALWAYS,AddressOption.getOption(helper.getAssert())); + assertEquals(AddressOption.RECEIVER,AddressOption.getOption(helper.getDelete())); + assertTrue("'mode' option wasn't read properly",helper.isBrowseOnly()); + } + + public void testNodeProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "node: " + + "{" + + "type: queue ," + + "durable: true ," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]" + + + "}" + + "}"); + AddressHelper helper = new AddressHelper(addr); + Node node = helper.getNode(); + assertEquals("'type' property wasn't read properly",AMQDestination.QUEUE_TYPE,helper.getNodeType()); + assertTrue("'durable' property wasn't read properly",node.isDurable()); + assertTrue("'auto-delete' property wasn't read properly",node.isAutoDelete()); + assertTrue("'exclusive' property wasn't read properly",node.isExclusive()); + assertEquals("'alternate-exchange' property wasn't read properly","amq.fanout",node.getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,node.getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,node.getBindings().size()); + for (Binding binding: node.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + } + + public void testLinkProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "link: " + + "{" + + "name: my-queue ," + + "durable: true ," + + "reliability: at-least-once," + + "capacity: {source:10, target:15}," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]," + + "x-subscribes:{exclusive: true, arguments: {a:b,x:y}}" + + "}" + + "}"); + + AddressHelper helper = new AddressHelper(addr); + Link link = helper.getLink(); + assertEquals("'name' property wasn't read properly","my-queue",link.getName()); + assertTrue("'durable' property wasn't read properly",link.isDurable()); + assertEquals("'reliability' property wasn't read properly",Reliability.AT_LEAST_ONCE,link.getReliability()); + assertTrue("'auto-delete' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isAutoDelete()); + assertTrue("'exclusive' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isExclusive()); + assertEquals("'alternate-exchange' property in 'x-declare' wasn't read properly","amq.fanout",link.getSubscriptionQueue().getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,link.getSubscriptionQueue().getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,link.getBindings().size()); + for (Binding binding: link.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + assertTrue("'exclusive' property in 'x-subscribe' wasn't read properly",link.getSubscription().isExclusive()); + assertEquals("'arguments' in 'x-subscribe' property wasn't read properly",2,link.getSubscription().getArgs().size()); + } + +} |
