diff options
Diffstat (limited to 'java')
15 files changed, 678 insertions, 382 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 096738f9ad..f14b6d810b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -112,16 +112,6 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } - protected Link getTargetLink() - { - return _targetLink; - } - - protected void setTargetLink(Link targetLink) - { - _targetLink = targetLink; - } - // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -186,9 +176,7 @@ public abstract class AMQDestination implements Destination, Referenceable private AddressOption _assert = AddressOption.NEVER; private AddressOption _delete = AddressOption.NEVER; - private Node _targetNode; - private Node _sourceNode; - private Link _targetLink; + private Node _node; private Link _link; @@ -823,24 +811,14 @@ public abstract class AMQDestination implements Destination, Referenceable _delete = option; } - public Node getTargetNode() + public Node getNode() { - return _targetNode; + return _node; } - public void setTargetNode(Node node) + public void setNode(Node node) { - _targetNode = node; - } - - public Node getSourceNode() - { - return _sourceNode; - } - - public void setSourceNode(Node node) - { - _sourceNode = node; + _node = node; } public Link getLink() @@ -901,21 +879,11 @@ public abstract class AMQDestination implements Destination, Referenceable _browseOnly = _addrHelper.isBrowseOnly(); - _addressType = _addrHelper.getTargetNodeType(); - _targetNode = _addrHelper.getTargetNode(_addressType); - _sourceNode = _addrHelper.getSourceNode(_addressType); + _addressType = _addrHelper.getNodeType(); + _node = _addrHelper.getNode(); _link = _addrHelper.getLink(); } - // This method is needed if we didn't know the node type at the beginning. - // Therefore we have to query the broker to figure out the type. - // Once the type is known we look for the necessary properties. - public void rebuildTargetAndSourceNodes(int addressType) - { - _targetNode = _addrHelper.getTargetNode(addressType); - _sourceNode = _addrHelper.getSourceNode(addressType); - } - // ----- / new address syntax ----------- public boolean isBrowseOnly() @@ -950,8 +918,7 @@ public abstract class AMQDestination implements Destination, Referenceable dest.setDelete(_delete); dest.setBrowseOnly(_browseOnly); dest.setAddressType(_addressType); - dest.setTargetNode(_targetNode); - dest.setSourceNode(_sourceNode); + dest.setNode(_node); dest.setLink(_link); dest.setAddressResolved(_addressResolved.get()); return dest; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 49964639e4..91a6389214 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -999,12 +999,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { try { - handleAddressBasedDestination(dest,false,noLocal,true); + resolveAddress(dest,false,noLocal); if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) { throw new JMSException("Durable subscribers can only be created for Topics"); } - dest.getSourceNode().setDurable(true); } catch(AMQException e) { @@ -2840,7 +2839,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (amqd.getDestSyntax() == DestSyntax.ADDR) { - handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); + resolveAddress(amqd,true,consumer.isNoLocal()); } else { @@ -2899,10 +2898,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - public abstract void handleAddressBasedDestination(AMQDestination dest, + public abstract void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException; + boolean noLocal) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e5f3e33d15..e271436c21 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,6 +17,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; @@ -29,8 +34,10 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; + import javax.jms.Destination; import javax.jms.JMSException; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -44,18 +51,31 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; - -import static org.apache.qpid.transport.Option.BATCH; -import static org.apache.qpid.transport.Option.NONE; -import static org.apache.qpid.transport.Option.SYNC; -import static org.apache.qpid.transport.Option.UNRELIABLE; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.QueueQueryResult; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; import org.apache.qpid.util.Strings; import org.slf4j.Logger; @@ -347,9 +367,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. List<Binding> bindings = new ArrayList<Binding>(); - bindings.addAll(destination.getSourceNode().getBindings()); - bindings.addAll(destination.getTargetNode().getBindings()); + bindings.addAll(destination.getNode().getBindings()); String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; @@ -599,7 +619,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException - { + { + if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal()); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -642,11 +672,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic capacity, Option.UNRELIABLE); } - - if (!nowait) - { - sync(); - } + sync(); } /** @@ -753,7 +779,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - QueueNode node = (QueueNode)amqd.getSourceNode(); + // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java + Node node = amqd.getNode(); Map<String,Object> arguments = new HashMap<String,Object>(); arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) @@ -1065,11 +1092,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } - public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); match = !result.getNotFound(); + Node node = dest.getNode(); if (match) { @@ -1079,16 +1107,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) && (matchProps(result.getArguments(),node.getDeclareArgs())); - } - else if (node.getExchangeType() != null) - { - // even if assert is false, better to verify this - match = node.getExchangeType().equals(result.getType()); - if (!match) - { - _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + - " actual " + result.getType()); - } } else { @@ -1097,18 +1115,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setExchangeClass(new AMQShortString(result.getType())); } } - return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { boolean match = true; try { QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); match = dest.getAddressName().equals(result.getQueue()); - + Node node = dest.getNode(); + if (match && assertNode) { match = (result.getDurable() == node.isDurable()) && @@ -1133,7 +1151,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic "Error querying queue",e); } } - return match; } @@ -1172,17 +1189,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ @SuppressWarnings("deprecation") - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { - if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) - { - createSubscriptionQueue(dest,noLocal); - } + return; } else { @@ -1202,50 +1215,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { case AMQDestination.QUEUE_TYPE: { - if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) + if (isQueueExist(dest,assertNode)) { - setLegacyFiledsForQueueType(dest); + setLegacyFieldsForQueueType(dest); break; } else if(createNode) { - setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,noLocal,noWait,false); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); break; } } case AMQDestination.TOPIC_TYPE: { - if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) + if (isExchangeExist(dest,assertNode)) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest, noLocal); - } break; } else if(createNode) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - sendExchangeDeclare(dest.getAddressName(), - dest.getExchangeClass().asString(), - dest.getTargetNode().getAlternateExchange(), - dest.getTargetNode().getDeclareArgs(), - false, - dest.getTargetNode().isDurable(), - dest.getTargetNode().isAutoDelete()); - if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) - { - createSubscriptionQueue(dest,noLocal); - } - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); + handleExchangeNodeCreation(dest); break; } } @@ -1284,7 +1279,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic throw new AMQException("Ambiguous address, please specify queue or topic as node type"); } dest.setAddressType(type); - dest.rebuildTargetAndSourceNodes(type); return type; } } @@ -1307,29 +1301,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException + void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException { - QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null - - if (dest.getQueueName() == null) + Link link = dest.getLink(); + String queueName = dest.getQueueName(); + + if (queueName == null) { - if (dest.getLink() != null && dest.getLink().getName() != null) - { - dest.setQueueName(new AMQShortString(dest.getLink().getName())); - } + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); } - node.setExclusive(true); - node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,noLocal,true,false); - getQpidSession().exchangeBind(dest.getQueueName(), + + SubscriptionQueue queueProps = link.getSubscriptionQueue(); + Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + getQpidSession().queueDeclare(queueName, + queueProps.getAlternateExchange(), arguments, + queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + link.isDurable() ? Option.DURABLE : Option.NONE, + queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + getQpidSession().exchangeBind(queueName, dest.getAddressName(), dest.getSubject(), Collections.<String,Object>emptyMap()); - sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), - null,dest.getExchangeName(),dest, false); } - public void setLegacyFiledsForQueueType(AMQDestination dest) + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support dest.setQueueName(new AMQShortString(dest.getAddressName())); @@ -1342,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // legacy support dest.setExchangeName(new AMQShortString(dest.getAddressName())); - ExchangeNode node = (ExchangeNode)dest.getTargetNode(); + Node node = dest.getNode(); dest.setExchangeClass(node.getExchangeType() == null? ExchangeDefaults.TOPIC_EXCHANGE_CLASS: new AMQShortString(node.getExchangeType())); @@ -1436,5 +1438,144 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic flushTask = null; } } -} + private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + { + Node node = dest.getNode(); + Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + getQpidSession().queueDeclare(dest.getAddressName(), + node.getAlternateExchange(), arguments, + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + sendExchangeDeclare(dest.getAddressName(), + node.getExchangeType(), + node.getAlternateExchange(), + node.getDeclareArgs(), + false, + node.isDurable(), + node.isAutoDelete()); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + void createBindings(AMQDestination dest, List<Binding> bindings) + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); + } + } + + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + getQpidSession().exchangeUnbind(queue, exchange, + binding.getBindingKey()); + } + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest, false)) + { + getQpidSession().queueDelete(dest.getQueueName()); + } + } + + void handleNodeDelete(AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + getQpidSession().exchangeDelete(dest.getAddressName()); + } + } + else + { + if (isQueueExist(dest,false)) + { + getQpidSession().queueDelete(dest.getAddressName()); + } + } + } +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index ccae5e31e5..3097b33da3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -695,10 +695,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void handleAddressBasedDestination(AMQDestination dest, + public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal, - boolean noWait) throws AMQException + boolean noLocal) throws AMQException { throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index f09ef5e01d..51b6c7e478 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -114,8 +114,8 @@ public class AMQTopic extends AMQDestination implements Topic AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); // link is never null if dest was created using an address string. t.getLink().setName(queueName.asString()); - t.getSourceNode().setAutoDelete(false); - t.getSourceNode().setDurable(true); + t.getLink().getSubscriptionQueue().setAutoDelete(false); + t.getLink().setDurable(true); // The legacy fields are also populated just in case. t.setQueueName(queueName); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f8e837cd34..b5e008da5a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -593,7 +593,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { sendCancel(); } - cleanupQueue(); } } catch (AMQException e) @@ -631,8 +630,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; - - abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ca5b1ac9c1..902770d901 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -95,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _capacity = evaluateCapacity(destination); + // This is due to the Destination carrying the temporary subscription name which is incorrect. if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; @@ -163,6 +164,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM @Override void sendCancel() throws AMQException { _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + postSubscription(); try { _0_10session.getQpidSession().sync(); @@ -499,7 +501,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - void cleanupQueue() throws AMQException, FailoverException + void postSubscription() throws AMQException { AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) @@ -507,9 +509,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); + ((AMQSession_0_10) getSession()).handleNodeDelete(dest); } + // Subscription queue is handled as part of linkDelete method. + ((AMQSession_0_10) getSession()).handleLinkDelete(dest); } } @@ -559,4 +562,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -} +}
\ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index fc7eacc760..f733e6bbca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -127,11 +127,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return receive(); } - void cleanupQueue() throws AMQException, FailoverException - { - - } - public RejectBehaviour getRejectBehaviour() { return _rejectBehaviour; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 3b4f642d4c..f717ca4655 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -86,7 +86,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { try { - getSession().handleAddressBasedDestination(destination,false,false,false); + getSession().resolveAddress(destination,false,false); + ((AMQSession_0_10)getSession()).handleLinkCreation(destination); + ((AMQSession_0_10)getSession()).sync(); } catch(Exception e) { @@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer return getSession().isQueueBound(destination); } + // We should have a close and closed method to distinguish between normal close + // and a close due to session or connection error. @Override public void close() throws JMSException { super.close(); AMQDestination dest = getAMQDestination(); - if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) + try { - try - { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - getAMQDestination().getQueueName()); - } - catch(TransportException e) + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.SENDER ) { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + ssn.handleNodeDelete(dest); } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index a0c3914127..ee39ecd7fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -344,7 +344,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd); + ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 318fe32d36..9b291b48f7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -20,21 +20,20 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.messaging.address.Link.Reliability; import org.apache.qpid.client.messaging.address.Link.Subscription; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; import org.apache.qpid.messaging.Address; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - /** * Utility class for extracting information from the address class */ @@ -68,58 +67,56 @@ public class AddressHelper public static final String ARGUMENTS = "arguments"; public static final String RELIABILITY = "reliability"; - private Address address; - private Accessor addressProps; - private Accessor nodeProps; - private Accessor linkProps; + private Address _address; + private Accessor _addressPropAccess; + private Accessor _nodePropAccess; + private Accessor _linkPropAccess; + private Map _addressPropMap; + private Map _nodePropMap; + private Map _linkPropMap; public AddressHelper(Address address) { - this.address = address; - addressProps = new MapAccessor(address.getOptions()); - Map node_props = address.getOptions() == null + this._address = address; + this._addressPropMap = address.getOptions(); + this._addressPropAccess = new MapAccessor(_addressPropMap); + this._nodePropMap = address.getOptions() == null || address.getOptions().get(NODE) == null ? null : (Map) address.getOptions().get(NODE); - if (node_props != null) + if (_nodePropMap != null) { - nodeProps = new MapAccessor(node_props); + _nodePropAccess = new MapAccessor(_nodePropMap); } - Map link_props = address.getOptions() == null + this._linkPropMap = address.getOptions() == null || address.getOptions().get(LINK) == null ? null : (Map) address.getOptions().get(LINK); - if (link_props != null) + if (_linkPropMap != null) { - linkProps = new MapAccessor(link_props); + _linkPropAccess = new MapAccessor(_linkPropMap); } } public String getCreate() { - return addressProps.getString(CREATE); + return _addressPropAccess.getString(CREATE); } public String getAssert() { - return addressProps.getString(ASSERT); + return _addressPropAccess.getString(ASSERT); } public String getDelete() { - return addressProps.getString(DELETE); - } - - public boolean isNoLocal() - { - Boolean b = nodeProps.getBoolean(NO_LOCAL); - return b == null ? false : b; + return _addressPropAccess.getString(DELETE); } public boolean isBrowseOnly() { - String mode = addressProps.getString(MODE); + String mode = _addressPropAccess.getString(MODE); return mode != null && mode.equals(BROWSE) ? true : false; } @@ -127,7 +124,7 @@ public class AddressHelper public List<Binding> getBindings(Map props) { List<Binding> bindings = new ArrayList<Binding>(); - List<Map> bindingList = (List<Map>) props.get(X_BINDINGS); + List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS); if (bindingList != null) { for (Map bindingMap : bindingList) @@ -157,117 +154,70 @@ public class AddressHelper } } - public int getTargetNodeType() throws Exception + public int getNodeType() throws Exception { - if (nodeProps == null || nodeProps.getString(TYPE) == null) + if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null) { // need to query and figure out return AMQDestination.UNKNOWN_TYPE; - } else if (nodeProps.getString(TYPE).equals("queue")) + } + else if (_nodePropAccess.getString(TYPE).equals("queue")) { return AMQDestination.QUEUE_TYPE; - } else if (nodeProps.getString(TYPE).equals("topic")) + } + else if (_nodePropAccess.getString(TYPE).equals("topic")) { return AMQDestination.TOPIC_TYPE; - } else + } + else { throw new Exception("unkown exchange type"); } } - public Node getTargetNode(int addressType) + public Node getNode() { - // target node here is the default exchange - if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) - { - return new ExchangeNode(); - } else if (addressType == AMQDestination.TOPIC_TYPE) - { - Map node = (Map) address.getOptions().get(NODE); - return createExchangeNode(node); - } else + Node node = new Node(_address.getName()); + if (_nodePropAccess != null) { - // don't know yet - return null; - } - } - - private Node createExchangeNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - ExchangeNode node = new ExchangeNode(); - node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap - .getString(TYPE)); - fillInCommonNodeArgs(node, parent, argsMap); - return node; - } + Map xDeclareMap = getDeclareArgs(_nodePropMap); + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); - private Node createQueueNode(Map parent) - { - Map declareArgs = getDeclareArgs(parent); - MapAccessor argsMap = new MapAccessor(declareArgs); - QueueNode node = new QueueNode(); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false - : argsMap.getBoolean(EXCLUSIVE)); - fillInCommonNodeArgs(node, parent, argsMap); - - return node; - } - - private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap) - { - node.setDurable(getDurability(parent)); - node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false - : argsMap.getBoolean(AUTO_DELETE)); - node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); - node.setBindings(getBindings(parent)); - if (getDeclareArgs(parent).containsKey(ARGUMENTS)) - { - node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS)); + node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false)); + node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false)); + node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false)); + node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + if (xDeclareMapAccessor.getString(TYPE) != null) + { + node.setExchangeType(xDeclareMapAccessor.getString(TYPE)); + } + node.setBindings(getBindings(_nodePropMap)); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } - } - - private boolean getDurability(Map map) - { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + return node; } - /** - * if the type == queue x-declare args from the node props is used. if the - * type == exchange x-declare args from the link props is used else just - * create a default temp queue. - */ - public Node getSourceNode(int addressType) + // This should really be in the Accessor interface + private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue) { - if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) - { - return createQueueNode((Map) address.getOptions().get(NODE)); - } - if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) - { - return createQueueNode((Map) address.getOptions().get(LINK)); - } else - { - // need to query the info - return new QueueNode(); - } + Boolean result = access.getBoolean(propName); + return (result == null) ? defaultValue : result.booleanValue(); } public Link getLink() throws Exception { Link link = new Link(); link.setSubscription(new Subscription()); - if (linkProps != null) + link.setSubscriptionQueue(new SubscriptionQueue()); + if (_linkPropAccess != null) { - link.setDurable(linkProps.getBoolean(DURABLE) == null ? false - : linkProps.getBoolean(DURABLE)); - link.setName(linkProps.getString(NAME)); + link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false)); + link.setName(_linkPropAccess.getString(NAME)); - String reliability = linkProps.getString(RELIABILITY); + String reliability = _linkPropAccess.getString(RELIABILITY); if ( reliability != null) { if (reliability.equalsIgnoreCase("unreliable")) @@ -283,13 +233,12 @@ public class AddressHelper throw new Exception("The reliability mode '" + reliability + "' is not yet supported"); } - } - if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) + if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map) { MapAccessor capacityProps = new MapAccessor( - (Map) ((Map) address.getOptions().get(LINK)) + (Map) ((Map) _address.getOptions().get(LINK)) .get(CAPACITY)); link .setConsumerCapacity(capacityProps @@ -302,17 +251,19 @@ public class AddressHelper } else { - int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps + int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess .getInt(CAPACITY); link.setConsumerCapacity(cap); link.setProducerCapacity(cap); } - link.setFilter(linkProps.getString(FILTER)); + link.setFilter(_linkPropAccess.getString(FILTER)); // so far filter type not used - if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE)) + Map linkMap = (Map) _address.getOptions().get(LINK); + + if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE)) { - Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE); + Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); if (x_subscribe.containsKey(ARGUMENTS)) { @@ -324,6 +275,18 @@ public class AddressHelper link.getSubscription().setExclusive(exclusive); } + + link.setBindings(getBindings(linkMap)); + Map xDeclareMap = getDeclareArgs(linkMap); + SubscriptionQueue queue = link.getSubscriptionQueue(); + if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS)) + { + MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true)); + queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); + queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); + queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); + } } return link; diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index 41f6725c8f..40a84ebd02 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.messaging.address; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.client.AMQDestination.Binding; + public class Link { public enum FilterType { SQL92, XQUERY, SUBJECT } @@ -36,10 +41,11 @@ public class Link private boolean _isDurable; private int _consumerCapacity = 0; private int _producerCapacity = 0; - private Node node; private Subscription subscription; private Reliability reliability = Reliability.AT_LEAST_ONCE; - + private List<Binding> _bindings = new ArrayList<Binding>(); + private SubscriptionQueue _subscriptionQueue; + public Reliability getReliability() { return reliability; @@ -50,21 +56,11 @@ public class Link this.reliability = reliability; } - public Node getNode() - { - return node; - } - - public void setNode(Node node) - { - this.node = node; - } - public boolean isDurable() { return _isDurable; } - + public void setDurable(boolean durable) { _isDurable = durable; @@ -139,6 +135,74 @@ public class Link { this.subscription = subscription; } + + public List<Binding> getBindings() + { + return _bindings; + } + + public void setBindings(List<Binding> bindings) + { + _bindings = bindings; + } + + public SubscriptionQueue getSubscriptionQueue() + { + return _subscriptionQueue; + } + + public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue) + { + this._subscriptionQueue = subscriptionQueue; + } + + public static class SubscriptionQueue + { + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); + private boolean _isAutoDelete = true; + private boolean _isExclusive = true; + private String _alternateExchange; + + public Map<String,Object> getDeclareArgs() + { + return _declareArgs; + } + + public void setDeclareArgs(Map<String,Object> options) + { + _declareArgs = options; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + } public static class Subscription { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index 0da0327885..005f98f344 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -public abstract class Node +public class Node { private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private String _name; private boolean _isDurable; private boolean _isAutoDelete; + private boolean _isExclusive; private String _alternateExchange; + private String _exchangeType = "topic"; // used when node is an exchange instead of a queue. private List<Binding> _bindings = new ArrayList<Binding>(); - private Map<String,Object> _declareArgs = Collections.emptyMap(); + private Map<String,Object> _declareArgs = new HashMap<String,Object>(); - protected Node(int nodeType) + protected Node(String name) + { + _name = name; + } + + public String getName() + { + return _name; + } + + public void setNodeType(int nodeType) { _nodeType = nodeType; } @@ -58,6 +72,16 @@ public abstract class Node _isDurable = durable; } + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + public boolean isAutoDelete() { return _isAutoDelete; @@ -100,56 +124,15 @@ public abstract class Node public void setDeclareArgs(Map<String,Object> options) { _declareArgs = options; - } - - public static class QueueNode extends Node - { - private boolean _isExclusive; - private QpidQueueOptions _queueOptions = new QpidQueueOptions(); - - public QueueNode() - { - super(AMQDestination.QUEUE_TYPE); - } - - public boolean isExclusive() - { - return _isExclusive; - } - - public void setExclusive(boolean exclusive) - { - _isExclusive = exclusive; - } } - - public static class ExchangeNode extends Node - { - private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); - private String _exchangeType; - - public ExchangeNode() - { - super(AMQDestination.TOPIC_TYPE); - } - - public String getExchangeType() - { - return _exchangeType; - } - - public void setExchangeType(String exchangeType) - { - _exchangeType = exchangeType; - } - + + public void setExchangeType(String type) + { + _exchangeType = type; } - - public static class UnknownNodeType extends Node + + public String getExchangeType() { - public UnknownNodeType() - { - super(AMQDestination.UNKNOWN_TYPE); - } + return _exchangeType; } } diff --git a/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java new file mode 100644 index 0000000000..a602dcbfd4 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java @@ -0,0 +1,126 @@ +package org.apache.qpid.client.messaging.address; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Link.Reliability; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AddressHelperTest extends QpidTestCase +{ + public void testAddressOptions() throws Exception + { + Address addr = Address.parse("queue/test;{create:sender, assert:always, delete:receiver, mode:browse}"); + AddressHelper helper = new AddressHelper(addr); + assertEquals(AddressOption.SENDER,AddressOption.getOption(helper.getCreate())); + assertEquals(AddressOption.ALWAYS,AddressOption.getOption(helper.getAssert())); + assertEquals(AddressOption.RECEIVER,AddressOption.getOption(helper.getDelete())); + assertTrue("'mode' option wasn't read properly",helper.isBrowseOnly()); + } + + public void testNodeProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "node: " + + "{" + + "type: queue ," + + "durable: true ," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]" + + + "}" + + "}"); + AddressHelper helper = new AddressHelper(addr); + Node node = helper.getNode(); + assertEquals("'type' property wasn't read properly",AMQDestination.QUEUE_TYPE,helper.getNodeType()); + assertTrue("'durable' property wasn't read properly",node.isDurable()); + assertTrue("'auto-delete' property wasn't read properly",node.isAutoDelete()); + assertTrue("'exclusive' property wasn't read properly",node.isExclusive()); + assertEquals("'alternate-exchange' property wasn't read properly","amq.fanout",node.getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,node.getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,node.getBindings().size()); + for (Binding binding: node.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + } + + public void testLinkProperties() throws Exception + { + Address addr = Address.parse("my-queue;{" + + "link: " + + "{" + + "name: my-queue ," + + "durable: true ," + + "reliability: at-least-once," + + "capacity: {source:10, target:15}," + + "x-declare: " + + "{" + + "exclusive: true," + + "auto-delete: true," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {" + + "'qpid.max_size': 1000," + + "'qpid.max_count': 100" + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " + + "{exchange : 'amq.fanout', queue:my-queue}," + + "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," + + "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" + + "]," + + "x-subscribes:{exclusive: true, arguments: {a:b,x:y}}" + + "}" + + "}"); + + AddressHelper helper = new AddressHelper(addr); + Link link = helper.getLink(); + assertEquals("'name' property wasn't read properly","my-queue",link.getName()); + assertTrue("'durable' property wasn't read properly",link.isDurable()); + assertEquals("'reliability' property wasn't read properly",Reliability.AT_LEAST_ONCE,link.getReliability()); + assertTrue("'auto-delete' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isAutoDelete()); + assertTrue("'exclusive' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isExclusive()); + assertEquals("'alternate-exchange' property in 'x-declare' wasn't read properly","amq.fanout",link.getSubscriptionQueue().getAlternateExchange()); + assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,link.getSubscriptionQueue().getDeclareArgs().size()); + assertEquals("'bindings' property wasn't read properly",4,link.getBindings().size()); + for (Binding binding: link.getBindings()) + { + assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq.")); + assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue()); + if (binding.getExchange().equals("amq.direct")) + { + assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey()); + } + if (binding.getExchange().equals("amq.match")) + { + assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size()); + } + } + assertTrue("'exclusive' property in 'x-subscribe' wasn't read properly",link.getSubscription().isExclusive()); + assertEquals("'arguments' in 'x-subscribe' property wasn't read properly",2,link.getSubscription().getArgs().size()); + } + +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index e1f93b975b..08ee70f072 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -29,8 +29,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.message.QpidMessageProperties; -import org.apache.qpid.client.messaging.address.Node.ExchangeNode; -import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -98,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,true)); // create always ------------------------------------------- @@ -107,10 +105,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; @@ -126,16 +124,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; @@ -161,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; @@ -177,14 +175,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "doesn't resolve to an exchange or a queue")); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); + dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs())); } @@ -221,7 +219,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", @@ -326,7 +324,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertTrue("Exchange not created as expected",( - (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true)); + (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( @@ -367,7 +365,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception { assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", @@ -506,14 +504,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons3 = jmsSession.createConsumer(dest3); assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); assertTrue("Destination1 was not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); assertTrue("Destination2 was not bound as expected",( (AMQSession_0_10)jmsSession).isQueueBound("", @@ -602,14 +600,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons.close(); // Using the ADDR method to create a more complicated queue - String addr = "ADDR:amq.direct/x512; {create: receiver, " + + String addr = "ADDR:amq.direct/x512; {" + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); - prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); + prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( (AMQSession_0_10)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); @@ -677,8 +675,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Using the ADDR method to create a more complicated topic topic = ssn.createTopic(addr); - prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); + prod = ssn.createProducer(topic); assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( (AMQSession_0_10)ssn).isQueueBound("vehicles", @@ -840,7 +838,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "}"; // Using the ADDR method to create a more complicated topic - MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr)); + Topic topic = ssn.createTopic(addr); + MessageConsumer cons = ssn.createConsumer(topic); assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( (AMQSession_0_10)ssn).isQueueBound("MRKT", @@ -854,7 +853,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); - MessageProducer prod = ssn.createProducer(ssn.createTopic(addr)); + MessageProducer prod = ssn.createProducer(topic); Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); prod.send(msg); @@ -909,32 +908,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; + Properties props = new Properties(); props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); - props.setProperty("destination.address1", "ADDR:amq.topic"); - props.setProperty("destination.address2", "ADDR:amq.direct/test"); - String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," + - "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; - props.setProperty("destination.address3", addrStr); - props.setProperty("topic.address4", "hello.world"); - addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + props.setProperty("destination.address1", "ADDR:amq.topic/test"); + props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); + props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); + String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; props.setProperty("destination.address5", addrStr); Context ctx = new InitialContext(props); - for (int i=1; i < 5; i++) + for (int i=1; i < 4; i++) { Topic topic = (Topic) ctx.lookup("address"+i); - createDurableSubscriber(ctx,ssn,"address"+i,topic); + createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test"); } Topic topic = ssn.createTopic("ADDR:news.us"); - createDurableSubscriber(ctx,ssn,"my-dest",topic); + createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us"); Topic namedQueue = (Topic) ctx.lookup("address5"); try { - createDurableSubscriber(ctx,ssn,"my-queue",namedQueue); + createDurableSubscriber(ctx,ssn,"my-queue",namedQueue,"ADDR:amq.topic/test"); fail("Exception should be thrown. Durable subscribers cannot be created for Queues"); } catch(JMSException e) @@ -944,15 +942,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } } - private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception + private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); - MessageProducer prod = ssn.createProducer(topic); + MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr)); Message m = ssn.createTextMessage(destName); prod.send(m); Message msg = cons.receive(1000); - assertNotNull(msg); + assertNotNull("Message not received as expected when using Topic : " + topic,msg); assertEquals(destName,((TextMessage)msg).getText()); ssn.unsubscribe(destName); } @@ -977,7 +975,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; @@ -993,7 +991,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; @@ -1010,9 +1008,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - + (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); } /** @@ -1307,4 +1303,56 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("message should be re-received by consumer after rollback", receivedMessage); jmsSession.commit(); } + + /** + * Test Goals : + * + * 1. Verify that link bindings are created and destroyed after creating and closing a subscriber. + * 2. Verify that link bindings are created and destroyed after creating and closing a subscriber. + */ + public void testLinkBindingBehavior() throws Exception + { + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String addr = "ADDR:my-queue; {create: always, " + + "link: " + + "{" + + "x-bindings: [{exchange : 'amq.direct', key : test}]," + + "}" + + "}"; + + AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr); + MessageConsumer cons = jmsSession.createConsumer(dest); + AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession; + + assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true)); + assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); + + cons.close(); // closing consumer, link binding should be removed now. + assertTrue("Queue should still be there",ssn.isQueueExist(dest, true)); + assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null)); + + MessageProducer prod = jmsSession.createProducer(dest); + assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); + prod.close(); + assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null)); + } + + /** + * Test Goals : Verifies that the subscription queue created is as specified under link properties. + */ + public void testCustomizingSubscriptionQueue() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," + + "alternate-exchange: 'amq.fanout'," + + "arguments: {'qpid.max_size': 1000,'qpid.max_count': 100}" + + "}"; + + String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}"; + MessageConsumer cons = ssn.createConsumer(ssn.createTopic(addr)); + + String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}"; + AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr); + ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + } } |
