summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java49
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java303
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java203
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java90
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java85
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java126
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java130
15 files changed, 678 insertions, 382 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 096738f9ad..f14b6d810b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -112,16 +112,6 @@ public abstract class AMQDestination implements Destination, Referenceable
_name = name;
}
- protected Link getTargetLink()
- {
- return _targetLink;
- }
-
- protected void setTargetLink(Link targetLink)
- {
- _targetLink = targetLink;
- }
-
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -186,9 +176,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private AddressOption _assert = AddressOption.NEVER;
private AddressOption _delete = AddressOption.NEVER;
- private Node _targetNode;
- private Node _sourceNode;
- private Link _targetLink;
+ private Node _node;
private Link _link;
@@ -823,24 +811,14 @@ public abstract class AMQDestination implements Destination, Referenceable
_delete = option;
}
- public Node getTargetNode()
+ public Node getNode()
{
- return _targetNode;
+ return _node;
}
- public void setTargetNode(Node node)
+ public void setNode(Node node)
{
- _targetNode = node;
- }
-
- public Node getSourceNode()
- {
- return _sourceNode;
- }
-
- public void setSourceNode(Node node)
- {
- _sourceNode = node;
+ _node = node;
}
public Link getLink()
@@ -901,21 +879,11 @@ public abstract class AMQDestination implements Destination, Referenceable
_browseOnly = _addrHelper.isBrowseOnly();
- _addressType = _addrHelper.getTargetNodeType();
- _targetNode = _addrHelper.getTargetNode(_addressType);
- _sourceNode = _addrHelper.getSourceNode(_addressType);
+ _addressType = _addrHelper.getNodeType();
+ _node = _addrHelper.getNode();
_link = _addrHelper.getLink();
}
- // This method is needed if we didn't know the node type at the beginning.
- // Therefore we have to query the broker to figure out the type.
- // Once the type is known we look for the necessary properties.
- public void rebuildTargetAndSourceNodes(int addressType)
- {
- _targetNode = _addrHelper.getTargetNode(addressType);
- _sourceNode = _addrHelper.getSourceNode(addressType);
- }
-
// ----- / new address syntax -----------
public boolean isBrowseOnly()
@@ -950,8 +918,7 @@ public abstract class AMQDestination implements Destination, Referenceable
dest.setDelete(_delete);
dest.setBrowseOnly(_browseOnly);
dest.setAddressType(_addressType);
- dest.setTargetNode(_targetNode);
- dest.setSourceNode(_sourceNode);
+ dest.setNode(_node);
dest.setLink(_link);
dest.setAddressResolved(_addressResolved.get());
return dest;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 49964639e4..91a6389214 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -999,12 +999,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
try
{
- handleAddressBasedDestination(dest,false,noLocal,true);
+ resolveAddress(dest,false,noLocal);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be created for Topics");
}
- dest.getSourceNode().setDurable(true);
}
catch(AMQException e)
{
@@ -2840,7 +2839,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait);
+ resolveAddress(amqd,true,consumer.isNoLocal());
}
else
{
@@ -2899,10 +2898,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- public abstract void handleAddressBasedDestination(AMQDestination dest,
+ public abstract void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException;
+ boolean noLocal) throws AMQException;
private void registerProducer(long producerId, MessageProducer producer)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index e5f3e33d15..e271436c21 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,6 +17,11 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,8 +34,10 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+
import javax.jms.Destination;
import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -44,18 +51,31 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.client.messaging.address.AddressHelper;
import org.apache.qpid.client.messaging.address.Link;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
-
-import static org.apache.qpid.transport.Option.BATCH;
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.QueueQueryResult;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
@@ -347,9 +367,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
List<Binding> bindings = new ArrayList<Binding>();
- bindings.addAll(destination.getSourceNode().getBindings());
- bindings.addAll(destination.getTargetNode().getBindings());
+ bindings.addAll(destination.getNode().getBindings());
String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
destination.getAddressName(): "amq.topic";
@@ -599,7 +619,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName,
boolean nowait, int tag)
throws AMQException, FailoverException
- {
+ {
+ if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+ {
+ if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+ {
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal());
+ queueName = consumer.getDestination().getAMQQueueName();
+ consumer.setQueuename(queueName);
+ }
+ handleLinkCreation(consumer.getDestination());
+ }
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -642,11 +672,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
capacity,
Option.UNRELIABLE);
}
-
- if (!nowait)
- {
- sync();
- }
+ sync();
}
/**
@@ -753,7 +779,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- QueueNode node = (QueueNode)amqd.getSourceNode();
+ // This code is here to ensure address based destination work with the declareQueue public method in AMQSession.java
+ Node node = amqd.getNode();
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs());
if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null)
@@ -1065,11 +1092,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
- public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
match = !result.getNotFound();
+ Node node = dest.getNode();
if (match)
{
@@ -1079,16 +1107,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(node.getExchangeType() != null &&
node.getExchangeType().equals(result.getType())) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
- }
- else if (node.getExchangeType() != null)
- {
- // even if assert is false, better to verify this
- match = node.getExchangeType().equals(result.getType());
- if (!match)
- {
- _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() +
- " actual " + result.getType());
- }
}
else
{
@@ -1097,18 +1115,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setExchangeClass(new AMQShortString(result.getType()));
}
}
-
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
+ public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
boolean match = true;
try
{
QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
match = dest.getAddressName().equals(result.getQueue());
-
+ Node node = dest.getNode();
+
if (match && assertNode)
{
match = (result.getDurable() == node.isDurable()) &&
@@ -1133,7 +1151,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
"Error querying queue",e);
}
}
-
return match;
}
@@ -1172,17 +1189,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
@SuppressWarnings("deprecation")
- public void handleAddressBasedDestination(AMQDestination dest,
+ public void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException
+ boolean noLocal) throws AMQException
{
if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
- if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
- {
- createSubscriptionQueue(dest,noLocal);
- }
+ return;
}
else
{
@@ -1202,50 +1215,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
case AMQDestination.QUEUE_TYPE:
{
- if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+ if (isQueueExist(dest,assertNode))
{
- setLegacyFiledsForQueueType(dest);
+ setLegacyFieldsForQueueType(dest);
break;
}
else if(createNode)
{
- setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,noLocal,noWait,false);
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
+ setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
break;
}
}
case AMQDestination.TOPIC_TYPE:
{
- if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+ if (isExchangeExist(dest,assertNode))
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest, noLocal);
- }
break;
}
else if(createNode)
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- sendExchangeDeclare(dest.getAddressName(),
- dest.getExchangeClass().asString(),
- dest.getTargetNode().getAlternateExchange(),
- dest.getTargetNode().getDeclareArgs(),
- false,
- dest.getTargetNode().isDurable(),
- dest.getTargetNode().isAutoDelete());
- if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true))
- {
- createSubscriptionQueue(dest,noLocal);
- }
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
+ handleExchangeNodeCreation(dest);
break;
}
}
@@ -1284,7 +1279,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
throw new AMQException("Ambiguous address, please specify queue or topic as node type");
}
dest.setAddressType(type);
- dest.rebuildTargetAndSourceNodes(type);
return type;
}
}
@@ -1307,29 +1301,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException
{
- QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null
-
- if (dest.getQueueName() == null)
+ Link link = dest.getLink();
+ String queueName = dest.getQueueName();
+
+ if (queueName == null)
{
- if (dest.getLink() != null && dest.getLink().getName() != null)
- {
- dest.setQueueName(new AMQShortString(dest.getLink().getName()));
- }
+ queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+ dest.setQueueName(new AMQShortString(queueName));
}
- node.setExclusive(true);
- node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,noLocal,true,false);
- getQpidSession().exchangeBind(dest.getQueueName(),
+
+ SubscriptionQueue queueProps = link.getSubscriptionQueue();
+ Map<String,Object> arguments = queueProps.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ getQpidSession().queueDeclare(queueName,
+ queueProps.getAlternateExchange(), arguments,
+ queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ link.isDurable() ? Option.DURABLE : Option.NONE,
+ queueProps.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ getQpidSession().exchangeBind(queueName,
dest.getAddressName(),
dest.getSubject(),
Collections.<String,Object>emptyMap());
- sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
- null,dest.getExchangeName(),dest, false);
}
- public void setLegacyFiledsForQueueType(AMQDestination dest)
+ public void setLegacyFieldsForQueueType(AMQDestination dest)
{
// legacy support
dest.setQueueName(new AMQShortString(dest.getAddressName()));
@@ -1342,7 +1344,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
// legacy support
dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- ExchangeNode node = (ExchangeNode)dest.getTargetNode();
+ Node node = dest.getNode();
dest.setExchangeClass(node.getExchangeType() == null?
ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
new AMQShortString(node.getExchangeType()));
@@ -1436,5 +1438,144 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
flushTask = null;
}
}
-}
+ private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ {
+ Node node = dest.getNode();
+ Map<String,Object> arguments = node.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+ getQpidSession().queueDeclare(dest.getAddressName(),
+ node.getAlternateExchange(), arguments,
+ node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ node.isDurable() ? Option.DURABLE : Option.NONE,
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+ {
+ Node node = dest.getNode();
+ sendExchangeDeclare(dest.getAddressName(),
+ node.getExchangeType(),
+ node.getAlternateExchange(),
+ node.getDeclareArgs(),
+ false,
+ node.isDurable(),
+ node.isAutoDelete());
+
+ // If bindings are specified without a queue name and is called by the producer,
+ // the broker will send an exception as expected.
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleLinkCreation(AMQDestination dest) throws AMQException
+ {
+ createBindings(dest, dest.getLink().getBindings());
+ }
+
+ void createBindings(AMQDestination dest, List<Binding> bindings)
+ {
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (Binding binding: bindings)
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
+ }
+
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ getQpidSession().exchangeUnbind(queue, exchange,
+ binding.getBindingKey());
+ }
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest, false))
+ {
+ getQpidSession().queueDelete(dest.getQueueName());
+ }
+ }
+
+ void handleNodeDelete(AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+ getQpidSession().exchangeDelete(dest.getAddressName());
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+ getQpidSession().queueDelete(dest.getAddressName());
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index ccae5e31e5..3097b33da3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -695,10 +695,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
- public void handleAddressBasedDestination(AMQDestination dest,
+ public void resolveAddress(AMQDestination dest,
boolean isConsumer,
- boolean noLocal,
- boolean noWait) throws AMQException
+ boolean noLocal) throws AMQException
{
throw new UnsupportedOperationException("The new addressing based syntax is "
+ "not supported for AMQP 0-8/0-9 versions");
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index f09ef5e01d..51b6c7e478 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -114,8 +114,8 @@ public class AMQTopic extends AMQDestination implements Topic
AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
// link is never null if dest was created using an address string.
t.getLink().setName(queueName.asString());
- t.getSourceNode().setAutoDelete(false);
- t.getSourceNode().setDurable(true);
+ t.getLink().getSubscriptionQueue().setAutoDelete(false);
+ t.getLink().setDurable(true);
// The legacy fields are also populated just in case.
t.setQueueName(queueName);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index f8e837cd34..b5e008da5a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -593,7 +593,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
sendCancel();
}
- cleanupQueue();
}
}
catch (AMQException e)
@@ -631,8 +630,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
abstract void sendCancel() throws AMQException, FailoverException;
-
- abstract void cleanupQueue() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ca5b1ac9c1..902770d901 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -95,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_capacity = evaluateCapacity(destination);
+ // This is due to the Destination carrying the temporary subscription name which is incorrect.
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
@@ -163,6 +164,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
@Override void sendCancel() throws AMQException
{
_0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ postSubscription();
try
{
_0_10session.getQpidSession().sync();
@@ -499,7 +501,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- void cleanupQueue() throws AMQException, FailoverException
+ void postSubscription() throws AMQException
{
AMQDestination dest = this.getDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
@@ -507,9 +509,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- this.getDestination().getQueueName());
+ ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
}
+ // Subscription queue is handled as part of linkDelete method.
+ ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
}
}
@@ -559,4 +562,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
-}
+} \ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index fc7eacc760..f733e6bbca 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -127,11 +127,6 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return receive();
}
- void cleanupQueue() throws AMQException, FailoverException
- {
-
- }
-
public RejectBehaviour getRejectBehaviour()
{
return _rejectBehaviour;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 3b4f642d4c..f717ca4655 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -86,7 +86,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
try
{
- getSession().handleAddressBasedDestination(destination,false,false,false);
+ getSession().resolveAddress(destination,false,false);
+ ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
+ ((AMQSession_0_10)getSession()).sync();
}
catch(Exception e)
{
@@ -251,25 +253,35 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
return getSession().isQueueBound(destination);
}
+ // We should have a close and closed method to distinguish between normal close
+ // and a close due to session or connection error.
@Override
public void close() throws JMSException
{
super.close();
AMQDestination dest = getAMQDestination();
- if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
+ try
{
- try
- {
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- getAMQDestination().getQueueName());
- }
- catch(TransportException e)
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.SENDER )
{
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ ssn.handleNodeDelete(dest);
}
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index a0c3914127..ee39ecd7fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -344,7 +344,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForQueueType(amqd);
+ ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd);
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 318fe32d36..9b291b48f7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -20,21 +20,20 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Link.Subscription;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.configuration.Accessor;
import org.apache.qpid.configuration.Accessor.MapAccessor;
import org.apache.qpid.messaging.Address;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/**
* Utility class for extracting information from the address class
*/
@@ -68,58 +67,56 @@ public class AddressHelper
public static final String ARGUMENTS = "arguments";
public static final String RELIABILITY = "reliability";
- private Address address;
- private Accessor addressProps;
- private Accessor nodeProps;
- private Accessor linkProps;
+ private Address _address;
+ private Accessor _addressPropAccess;
+ private Accessor _nodePropAccess;
+ private Accessor _linkPropAccess;
+ private Map _addressPropMap;
+ private Map _nodePropMap;
+ private Map _linkPropMap;
public AddressHelper(Address address)
{
- this.address = address;
- addressProps = new MapAccessor(address.getOptions());
- Map node_props = address.getOptions() == null
+ this._address = address;
+ this._addressPropMap = address.getOptions();
+ this._addressPropAccess = new MapAccessor(_addressPropMap);
+ this._nodePropMap = address.getOptions() == null
|| address.getOptions().get(NODE) == null ? null
: (Map) address.getOptions().get(NODE);
- if (node_props != null)
+ if (_nodePropMap != null)
{
- nodeProps = new MapAccessor(node_props);
+ _nodePropAccess = new MapAccessor(_nodePropMap);
}
- Map link_props = address.getOptions() == null
+ this._linkPropMap = address.getOptions() == null
|| address.getOptions().get(LINK) == null ? null
: (Map) address.getOptions().get(LINK);
- if (link_props != null)
+ if (_linkPropMap != null)
{
- linkProps = new MapAccessor(link_props);
+ _linkPropAccess = new MapAccessor(_linkPropMap);
}
}
public String getCreate()
{
- return addressProps.getString(CREATE);
+ return _addressPropAccess.getString(CREATE);
}
public String getAssert()
{
- return addressProps.getString(ASSERT);
+ return _addressPropAccess.getString(ASSERT);
}
public String getDelete()
{
- return addressProps.getString(DELETE);
- }
-
- public boolean isNoLocal()
- {
- Boolean b = nodeProps.getBoolean(NO_LOCAL);
- return b == null ? false : b;
+ return _addressPropAccess.getString(DELETE);
}
public boolean isBrowseOnly()
{
- String mode = addressProps.getString(MODE);
+ String mode = _addressPropAccess.getString(MODE);
return mode != null && mode.equals(BROWSE) ? true : false;
}
@@ -127,7 +124,7 @@ public class AddressHelper
public List<Binding> getBindings(Map props)
{
List<Binding> bindings = new ArrayList<Binding>();
- List<Map> bindingList = (List<Map>) props.get(X_BINDINGS);
+ List<Map> bindingList = (props == null) ? Collections.EMPTY_LIST : (List<Map>) props.get(X_BINDINGS);
if (bindingList != null)
{
for (Map bindingMap : bindingList)
@@ -157,117 +154,70 @@ public class AddressHelper
}
}
- public int getTargetNodeType() throws Exception
+ public int getNodeType() throws Exception
{
- if (nodeProps == null || nodeProps.getString(TYPE) == null)
+ if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
{
// need to query and figure out
return AMQDestination.UNKNOWN_TYPE;
- } else if (nodeProps.getString(TYPE).equals("queue"))
+ }
+ else if (_nodePropAccess.getString(TYPE).equals("queue"))
{
return AMQDestination.QUEUE_TYPE;
- } else if (nodeProps.getString(TYPE).equals("topic"))
+ }
+ else if (_nodePropAccess.getString(TYPE).equals("topic"))
{
return AMQDestination.TOPIC_TYPE;
- } else
+ }
+ else
{
throw new Exception("unkown exchange type");
}
}
- public Node getTargetNode(int addressType)
+ public Node getNode()
{
- // target node here is the default exchange
- if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
- {
- return new ExchangeNode();
- } else if (addressType == AMQDestination.TOPIC_TYPE)
- {
- Map node = (Map) address.getOptions().get(NODE);
- return createExchangeNode(node);
- } else
+ Node node = new Node(_address.getName());
+ if (_nodePropAccess != null)
{
- // don't know yet
- return null;
- }
- }
-
- private Node createExchangeNode(Map parent)
- {
- Map declareArgs = getDeclareArgs(parent);
- MapAccessor argsMap = new MapAccessor(declareArgs);
- ExchangeNode node = new ExchangeNode();
- node.setExchangeType(argsMap.getString(TYPE) == null ? null : argsMap
- .getString(TYPE));
- fillInCommonNodeArgs(node, parent, argsMap);
- return node;
- }
+ Map xDeclareMap = getDeclareArgs(_nodePropMap);
+ MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
- private Node createQueueNode(Map parent)
- {
- Map declareArgs = getDeclareArgs(parent);
- MapAccessor argsMap = new MapAccessor(declareArgs);
- QueueNode node = new QueueNode();
- node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
- node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null ? false
- : argsMap.getBoolean(EXCLUSIVE));
- fillInCommonNodeArgs(node, parent, argsMap);
-
- return node;
- }
-
- private void fillInCommonNodeArgs(Node node, Map parent, MapAccessor argsMap)
- {
- node.setDurable(getDurability(parent));
- node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null ? false
- : argsMap.getBoolean(AUTO_DELETE));
- node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
- node.setBindings(getBindings(parent));
- if (getDeclareArgs(parent).containsKey(ARGUMENTS))
- {
- node.setDeclareArgs((Map<String,Object>)getDeclareArgs(parent).get(ARGUMENTS));
+ node.setDurable(getBooleanProperty(_nodePropAccess,DURABLE,false));
+ node.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,false));
+ node.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,false));
+ node.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+ if (xDeclareMapAccessor.getString(TYPE) != null)
+ {
+ node.setExchangeType(xDeclareMapAccessor.getString(TYPE));
+ }
+ node.setBindings(getBindings(_nodePropMap));
+ if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+ {
+ node.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+ }
}
- }
-
- private boolean getDurability(Map map)
- {
- Accessor access = new MapAccessor(map);
- Boolean result = access.getBoolean(DURABLE);
- return (result == null) ? false : result.booleanValue();
+ return node;
}
- /**
- * if the type == queue x-declare args from the node props is used. if the
- * type == exchange x-declare args from the link props is used else just
- * create a default temp queue.
- */
- public Node getSourceNode(int addressType)
+ // This should really be in the Accessor interface
+ private boolean getBooleanProperty(Accessor access, String propName, boolean defaultValue)
{
- if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
- {
- return createQueueNode((Map) address.getOptions().get(NODE));
- }
- if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
- {
- return createQueueNode((Map) address.getOptions().get(LINK));
- } else
- {
- // need to query the info
- return new QueueNode();
- }
+ Boolean result = access.getBoolean(propName);
+ return (result == null) ? defaultValue : result.booleanValue();
}
public Link getLink() throws Exception
{
Link link = new Link();
link.setSubscription(new Subscription());
- if (linkProps != null)
+ link.setSubscriptionQueue(new SubscriptionQueue());
+ if (_linkPropAccess != null)
{
- link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
- : linkProps.getBoolean(DURABLE));
- link.setName(linkProps.getString(NAME));
+ link.setDurable(getBooleanProperty(_linkPropAccess,DURABLE,false));
+ link.setName(_linkPropAccess.getString(NAME));
- String reliability = linkProps.getString(RELIABILITY);
+ String reliability = _linkPropAccess.getString(RELIABILITY);
if ( reliability != null)
{
if (reliability.equalsIgnoreCase("unreliable"))
@@ -283,13 +233,12 @@ public class AddressHelper
throw new Exception("The reliability mode '" +
reliability + "' is not yet supported");
}
-
}
- if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
+ if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor(
- (Map) ((Map) address.getOptions().get(LINK))
+ (Map) ((Map) _address.getOptions().get(LINK))
.get(CAPACITY));
link
.setConsumerCapacity(capacityProps
@@ -302,17 +251,19 @@ public class AddressHelper
}
else
{
- int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
+ int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
.getInt(CAPACITY);
link.setConsumerCapacity(cap);
link.setProducerCapacity(cap);
}
- link.setFilter(linkProps.getString(FILTER));
+ link.setFilter(_linkPropAccess.getString(FILTER));
// so far filter type not used
- if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ Map linkMap = (Map) _address.getOptions().get(LINK);
+
+ if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
{
- Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+ Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
if (x_subscribe.containsKey(ARGUMENTS))
{
@@ -324,6 +275,18 @@ public class AddressHelper
link.getSubscription().setExclusive(exclusive);
}
+
+ link.setBindings(getBindings(linkMap));
+ Map xDeclareMap = getDeclareArgs(linkMap);
+ SubscriptionQueue queue = link.getSubscriptionQueue();
+ if (!xDeclareMap.isEmpty() && xDeclareMap.containsKey(ARGUMENTS))
+ {
+ MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
+ queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true));
+ queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true));
+ queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
+ queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
+ }
}
return link;
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 41f6725c8f..40a84ebd02 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpid.client.AMQDestination.Binding;
+
public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
@@ -36,10 +41,11 @@ public class Link
private boolean _isDurable;
private int _consumerCapacity = 0;
private int _producerCapacity = 0;
- private Node node;
private Subscription subscription;
private Reliability reliability = Reliability.AT_LEAST_ONCE;
-
+ private List<Binding> _bindings = new ArrayList<Binding>();
+ private SubscriptionQueue _subscriptionQueue;
+
public Reliability getReliability()
{
return reliability;
@@ -50,21 +56,11 @@ public class Link
this.reliability = reliability;
}
- public Node getNode()
- {
- return node;
- }
-
- public void setNode(Node node)
- {
- this.node = node;
- }
-
public boolean isDurable()
{
return _isDurable;
}
-
+
public void setDurable(boolean durable)
{
_isDurable = durable;
@@ -139,6 +135,74 @@ public class Link
{
this.subscription = subscription;
}
+
+ public List<Binding> getBindings()
+ {
+ return _bindings;
+ }
+
+ public void setBindings(List<Binding> bindings)
+ {
+ _bindings = bindings;
+ }
+
+ public SubscriptionQueue getSubscriptionQueue()
+ {
+ return _subscriptionQueue;
+ }
+
+ public void setSubscriptionQueue(SubscriptionQueue subscriptionQueue)
+ {
+ this._subscriptionQueue = subscriptionQueue;
+ }
+
+ public static class SubscriptionQueue
+ {
+ private Map<String,Object> _declareArgs = new HashMap<String,Object>();
+ private boolean _isAutoDelete = true;
+ private boolean _isExclusive = true;
+ private String _alternateExchange;
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _declareArgs;
+ }
+
+ public void setDeclareArgs(Map<String,Object> options)
+ {
+ _declareArgs = options;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public void setAutoDelete(boolean autoDelete)
+ {
+ _isAutoDelete = autoDelete;
+ }
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
+ public String getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String altExchange)
+ {
+ _alternateExchange = altExchange;
+ }
+ }
public static class Subscription
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
index 0da0327885..005f98f344 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
@@ -26,19 +26,33 @@ import org.apache.qpid.client.AMQDestination.Binding;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class Node
+public class Node
{
private int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ private String _name;
private boolean _isDurable;
private boolean _isAutoDelete;
+ private boolean _isExclusive;
private String _alternateExchange;
+ private String _exchangeType = "topic"; // used when node is an exchange instead of a queue.
private List<Binding> _bindings = new ArrayList<Binding>();
- private Map<String,Object> _declareArgs = Collections.emptyMap();
+ private Map<String,Object> _declareArgs = new HashMap<String,Object>();
- protected Node(int nodeType)
+ protected Node(String name)
+ {
+ _name = name;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void setNodeType(int nodeType)
{
_nodeType = nodeType;
}
@@ -58,6 +72,16 @@ public abstract class Node
_isDurable = durable;
}
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
public boolean isAutoDelete()
{
return _isAutoDelete;
@@ -100,56 +124,15 @@ public abstract class Node
public void setDeclareArgs(Map<String,Object> options)
{
_declareArgs = options;
- }
-
- public static class QueueNode extends Node
- {
- private boolean _isExclusive;
- private QpidQueueOptions _queueOptions = new QpidQueueOptions();
-
- public QueueNode()
- {
- super(AMQDestination.QUEUE_TYPE);
- }
-
- public boolean isExclusive()
- {
- return _isExclusive;
- }
-
- public void setExclusive(boolean exclusive)
- {
- _isExclusive = exclusive;
- }
}
-
- public static class ExchangeNode extends Node
- {
- private QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
- private String _exchangeType;
-
- public ExchangeNode()
- {
- super(AMQDestination.TOPIC_TYPE);
- }
-
- public String getExchangeType()
- {
- return _exchangeType;
- }
-
- public void setExchangeType(String exchangeType)
- {
- _exchangeType = exchangeType;
- }
-
+
+ public void setExchangeType(String type)
+ {
+ _exchangeType = type;
}
-
- public static class UnknownNodeType extends Node
+
+ public String getExchangeType()
{
- public UnknownNodeType()
- {
- super(AMQDestination.UNKNOWN_TYPE);
- }
+ return _exchangeType;
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java
new file mode 100644
index 0000000000..a602dcbfd4
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java
@@ -0,0 +1,126 @@
+package org.apache.qpid.client.messaging.address;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AddressHelperTest extends QpidTestCase
+{
+ public void testAddressOptions() throws Exception
+ {
+ Address addr = Address.parse("queue/test;{create:sender, assert:always, delete:receiver, mode:browse}");
+ AddressHelper helper = new AddressHelper(addr);
+ assertEquals(AddressOption.SENDER,AddressOption.getOption(helper.getCreate()));
+ assertEquals(AddressOption.ALWAYS,AddressOption.getOption(helper.getAssert()));
+ assertEquals(AddressOption.RECEIVER,AddressOption.getOption(helper.getDelete()));
+ assertTrue("'mode' option wasn't read properly",helper.isBrowseOnly());
+ }
+
+ public void testNodeProperties() throws Exception
+ {
+ Address addr = Address.parse("my-queue;{" +
+ "node: " +
+ "{" +
+ "type: queue ," +
+ "durable: true ," +
+ "x-declare: " +
+ "{" +
+ "exclusive: true," +
+ "auto-delete: true," +
+ "alternate-exchange: 'amq.fanout'," +
+ "arguments: {" +
+ "'qpid.max_size': 1000," +
+ "'qpid.max_count': 100" +
+ "}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " +
+ "{exchange : 'amq.fanout', queue:my-queue}," +
+ "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," +
+ "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" +
+ "]" +
+
+ "}" +
+ "}");
+ AddressHelper helper = new AddressHelper(addr);
+ Node node = helper.getNode();
+ assertEquals("'type' property wasn't read properly",AMQDestination.QUEUE_TYPE,helper.getNodeType());
+ assertTrue("'durable' property wasn't read properly",node.isDurable());
+ assertTrue("'auto-delete' property wasn't read properly",node.isAutoDelete());
+ assertTrue("'exclusive' property wasn't read properly",node.isExclusive());
+ assertEquals("'alternate-exchange' property wasn't read properly","amq.fanout",node.getAlternateExchange());
+ assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,node.getDeclareArgs().size());
+ assertEquals("'bindings' property wasn't read properly",4,node.getBindings().size());
+ for (Binding binding: node.getBindings())
+ {
+ assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq."));
+ assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue());
+ if (binding.getExchange().equals("amq.direct"))
+ {
+ assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey());
+ }
+ if (binding.getExchange().equals("amq.match"))
+ {
+ assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size());
+ }
+ }
+ }
+
+ public void testLinkProperties() throws Exception
+ {
+ Address addr = Address.parse("my-queue;{" +
+ "link: " +
+ "{" +
+ "name: my-queue ," +
+ "durable: true ," +
+ "reliability: at-least-once," +
+ "capacity: {source:10, target:15}," +
+ "x-declare: " +
+ "{" +
+ "exclusive: true," +
+ "auto-delete: true," +
+ "alternate-exchange: 'amq.fanout'," +
+ "arguments: {" +
+ "'qpid.max_size': 1000," +
+ "'qpid.max_count': 100" +
+ "}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', queue:my-queue, key : test}, " +
+ "{exchange : 'amq.fanout', queue:my-queue}," +
+ "{exchange: 'amq.match', queue:my-queue, arguments: {x-match: any, dep: sales, loc: CA}}," +
+ "{exchange : 'amq.topic',queue:my-queue, key : 'a.#'}" +
+ "]," +
+ "x-subscribes:{exclusive: true, arguments: {a:b,x:y}}" +
+ "}" +
+ "}");
+
+ AddressHelper helper = new AddressHelper(addr);
+ Link link = helper.getLink();
+ assertEquals("'name' property wasn't read properly","my-queue",link.getName());
+ assertTrue("'durable' property wasn't read properly",link.isDurable());
+ assertEquals("'reliability' property wasn't read properly",Reliability.AT_LEAST_ONCE,link.getReliability());
+ assertTrue("'auto-delete' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isAutoDelete());
+ assertTrue("'exclusive' property in 'x-declare' wasn't read properly",link.getSubscriptionQueue().isExclusive());
+ assertEquals("'alternate-exchange' property in 'x-declare' wasn't read properly","amq.fanout",link.getSubscriptionQueue().getAlternateExchange());
+ assertEquals("'arguments' in 'x-declare' property wasn't read properly",2,link.getSubscriptionQueue().getDeclareArgs().size());
+ assertEquals("'bindings' property wasn't read properly",4,link.getBindings().size());
+ for (Binding binding: link.getBindings())
+ {
+ assertTrue("property 'exchange' in bindings wasn't read properly",binding.getExchange().startsWith("amq."));
+ assertEquals("property 'queue' in bindings wasn't read properly","my-queue",binding.getQueue());
+ if (binding.getExchange().equals("amq.direct"))
+ {
+ assertEquals("'key' property in bindings wasn't read properly","test",binding.getBindingKey());
+ }
+ if (binding.getExchange().equals("amq.match"))
+ {
+ assertEquals("'arguments' property in bindings wasn't read properly",3,binding.getArgs().size());
+ }
+ }
+ assertTrue("'exclusive' property in 'x-subscribe' wasn't read properly",link.getSubscription().isExclusive());
+ assertEquals("'arguments' in 'x-subscribe' property wasn't read properly",2,link.getSubscription().getArgs().size());
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index e1f93b975b..08ee70f072 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -29,8 +29,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.message.QpidMessageProperties;
-import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -98,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,true));
// create always -------------------------------------------
@@ -107,10 +105,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
// create receiver -----------------------------------------
addr1 = "ADDR:testQueue2; { create: receiver }";
@@ -126,16 +124,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
@@ -161,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
@@ -177,14 +175,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"doesn't resolve to an exchange or a queue"));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
+ dest.getAddressName(),dest.getAddressName(), dest.getNode().getDeclareArgs()));
}
@@ -221,7 +219,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// Even if the consumer is closed the queue and the bindings should be intact.
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
@@ -326,7 +324,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertTrue("Exchange not created as expected",(
- (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
+ (AMQSession_0_10)jmsSession).isExchangeExist(dest,true));
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
@@ -367,7 +365,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
{
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
@@ -506,14 +504,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
MessageConsumer cons3 = jmsSession.createConsumer(dest3);
assertTrue("Destination1 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest1, true));
assertTrue("Destination1 was not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
dest1.getAddressName(),dest1.getAddressName(), null));
assertTrue("Destination2 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest2,true));
assertTrue("Destination2 was not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
@@ -602,14 +600,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons.close();
// Using the ADDR method to create a more complicated queue
- String addr = "ADDR:amq.direct/x512; {create: receiver, " +
+ String addr = "ADDR:amq.direct/x512; {" +
"link : {name : 'MY.RESP.QUEUE', " +
"x-declare : { auto-delete: true, exclusive: true, " +
"arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
queue = ssn.createQueue(addr);
- prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
+ prod = ssn.createProducer(queue);
assertTrue("MY.RESP.QUEUE was not created as expected",(
(AMQSession_0_10)ssn).isQueueBound("amq.direct",
"MY.RESP.QUEUE","x512", null));
@@ -677,8 +675,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// Using the ADDR method to create a more complicated topic
topic = ssn.createTopic(addr);
- prod = ssn.createProducer(topic);
cons = ssn.createConsumer(topic);
+ prod = ssn.createProducer(topic);
assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
(AMQSession_0_10)ssn).isQueueBound("vehicles",
@@ -840,7 +838,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"}";
// Using the ADDR method to create a more complicated topic
- MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr));
+ Topic topic = ssn.createTopic(addr);
+ MessageConsumer cons = ssn.createConsumer(topic);
assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
(AMQSession_0_10)ssn).isQueueBound("MRKT",
@@ -854,7 +853,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
(AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","CNTL.#", null));
- MessageProducer prod = ssn.createProducer(ssn.createTopic(addr));
+ MessageProducer prod = ssn.createProducer(topic);
Message msg = ssn.createTextMessage("test");
msg.setStringProperty("qpid.subject", "NASDAQ.ABCD");
prod.send(msg);
@@ -909,32 +908,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
+
Properties props = new Properties();
props.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- props.setProperty("destination.address1", "ADDR:amq.topic");
- props.setProperty("destination.address2", "ADDR:amq.direct/test");
- String addrStr = "ADDR:amq.topic/test; {link:{name: my-topic," +
- "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}";
- props.setProperty("destination.address3", addrStr);
- props.setProperty("topic.address4", "hello.world");
- addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ props.setProperty("destination.address1", "ADDR:amq.topic/test");
+ props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr);
+ props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr);
+ String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
props.setProperty("destination.address5", addrStr);
Context ctx = new InitialContext(props);
- for (int i=1; i < 5; i++)
+ for (int i=1; i < 4; i++)
{
Topic topic = (Topic) ctx.lookup("address"+i);
- createDurableSubscriber(ctx,ssn,"address"+i,topic);
+ createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test");
}
Topic topic = ssn.createTopic("ADDR:news.us");
- createDurableSubscriber(ctx,ssn,"my-dest",topic);
+ createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us");
Topic namedQueue = (Topic) ctx.lookup("address5");
try
{
- createDurableSubscriber(ctx,ssn,"my-queue",namedQueue);
+ createDurableSubscriber(ctx,ssn,"my-queue",namedQueue,"ADDR:amq.topic/test");
fail("Exception should be thrown. Durable subscribers cannot be created for Queues");
}
catch(JMSException e)
@@ -944,15 +942,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
}
- private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic) throws Exception
+ private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception
{
MessageConsumer cons = ssn.createDurableSubscriber(topic, destName);
- MessageProducer prod = ssn.createProducer(topic);
+ MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr));
Message m = ssn.createTextMessage(destName);
prod.send(m);
Message msg = cons.receive(1000);
- assertNotNull(msg);
+ assertNotNull("Message not received as expected when using Topic : " + topic,msg);
assertEquals(destName,((TextMessage)msg).getText());
ssn.unsubscribe(destName);
}
@@ -977,7 +975,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
@@ -993,7 +991,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
@@ -1010,9 +1008,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
}
/**
@@ -1307,4 +1303,56 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("message should be re-received by consumer after rollback", receivedMessage);
jmsSession.commit();
}
+
+ /**
+ * Test Goals :
+ *
+ * 1. Verify that link bindings are created and destroyed after creating and closing a subscriber.
+ * 2. Verify that link bindings are created and destroyed after creating and closing a subscriber.
+ */
+ public void testLinkBindingBehavior() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String addr = "ADDR:my-queue; {create: always, " +
+ "link: " +
+ "{" +
+ "x-bindings: [{exchange : 'amq.direct', key : test}]," +
+ "}" +
+ "}";
+
+ AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+ AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession;
+
+ assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true));
+ assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
+
+ cons.close(); // closing consumer, link binding should be removed now.
+ assertTrue("Queue should still be there",ssn.isQueueExist(dest, true));
+ assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null));
+
+ MessageProducer prod = jmsSession.createProducer(dest);
+ assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
+ prod.close();
+ assertFalse("Binding should not exist anymore",ssn.isQueueBound("amq.direct","my-queue","test", null));
+ }
+
+ /**
+ * Test Goals : Verifies that the subscription queue created is as specified under link properties.
+ */
+ public void testCustomizingSubscriptionQueue() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ String xDeclareArgs = "x-declare: { exclusive: false, auto-delete: false," +
+ "alternate-exchange: 'amq.fanout'," +
+ "arguments: {'qpid.max_size': 1000,'qpid.max_count': 100}" +
+ "}";
+
+ String addr = "ADDR:amq.topic/test; {link: {name:my-queue, durable:true," + xDeclareArgs + "}}";
+ MessageConsumer cons = ssn.createConsumer(ssn.createTopic(addr));
+
+ String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}";
+ AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr);
+ ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+ }
}