summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-04-15 19:39:06 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-04-15 19:39:06 +0000
commit7836498914eab08ec0eb7b10824deaa4d78db976 (patch)
tree09f9e527496e8fca2b60e4bfdc73329c8ca2957d /java
parentc9ea7911e448675d483305d15a0d734542d98eb1 (diff)
downloadqpid-python-7836498914eab08ec0eb7b10824deaa4d78db976.tar.gz
This is related to QPID-2496
The changes include support the new addressing structure and most items on the list specified in the JIRA. The following is not included in the commit 1. Add subject as filter in JMS - for exchanges use it as binding key and for queues use it as a selector - this needs to be thought through. Besides JMS already provides a way to handle this. 2. Implementation of DELETE option. Further testing needs to be done to figure out the impact. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java165
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java295
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java265
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java106
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java162
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java120
7 files changed, 835 insertions, 280 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 1ed64e7890..2d7844944a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -32,6 +32,8 @@ import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
import org.apache.qpid.client.messaging.address.QpidQueueOptions;
import org.apache.qpid.configuration.ClientProperties;
@@ -119,27 +121,25 @@ public abstract class AMQDestination implements Destination, Referenceable
}
}
- public enum FilterType { SQL92, XQUERY, SUBJECT }
+
protected static DestSyntax defaultDestSyntax;
protected DestSyntax _destSyntax;
+ protected AddressHelper _addrHelper;
protected Address _address;
+ protected int _addressType = AMQDestination.UNKNOWN_TYPE;
protected String _name;
protected String _subject;
protected AddressOption _create = AddressOption.NEVER;
- protected AddressOption _assert = AddressOption.ALWAYS;
- protected AddressOption _delete = AddressOption.NEVER;
-
- protected String _filter;
- protected FilterType _filterType = FilterType.SUBJECT;
- protected boolean _isNoLocal;
- protected int _nodeType = QUEUE_TYPE;
- protected String _alternateExchange;
- protected QpidQueueOptions _queueOptions;
- protected QpidExchangeOptions _exchangeOptions;
- protected List<Binding> _bindings = new ArrayList<Binding>();
+ protected AddressOption _assert = AddressOption.NEVER;
+ protected AddressOption _delete = AddressOption.NEVER;
+
+ protected Node _targetNode;
+ protected Node _sourceNode;
+ protected Link _targetLink;
+ protected Link _sourceLink;
// ----- / Fields required to support new address syntax -------
static
@@ -149,7 +149,7 @@ public abstract class AMQDestination implements Destination, Referenceable
DestSyntax.BURL.toString()));
}
- protected AMQDestination(Address address)
+ protected AMQDestination(Address address) throws Exception
{
this._address = address;
getInfoFromAddress();
@@ -169,7 +169,16 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_destSyntax = DestSyntax.ADDR;
this._address = createAddressFromString(str);
- getInfoFromAddress();
+ try
+ {
+ getInfoFromAddress();
+ }
+ catch(Exception e)
+ {
+ URISyntaxException ex = new URISyntaxException(str,"Error parsing address");
+ ex.initCause(e);
+ throw ex;
+ }
}
_logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax);
}
@@ -264,6 +273,11 @@ public abstract class AMQDestination implements Destination, Referenceable
_logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
}
+ public DestSyntax getDestSyntax()
+ {
+ return _destSyntax;
+ }
+
public AMQShortString getEncodedName()
{
if(_urlAsShortString == null)
@@ -629,15 +643,21 @@ public abstract class AMQDestination implements Destination, Referenceable
}
// ----- new address syntax -----------
+
public static class Binding
{
String exchange;
String bindingKey;
+ String queue;
Map<String,Object> args;
- public Binding(String exchange,String bindingKey,Map<String,Object> args)
+ public Binding(String exchange,
+ String queue,
+ String bindingKey,
+ Map<String,Object> args)
{
this.exchange = exchange;
+ this.queue = queue;
this.bindingKey = bindingKey;
this.args = args;
}
@@ -647,6 +667,11 @@ public abstract class AMQDestination implements Destination, Referenceable
return exchange;
}
+ public String getQueue()
+ {
+ return queue;
+ }
+
public String getBindingKey()
{
return bindingKey;
@@ -662,7 +687,15 @@ public abstract class AMQDestination implements Destination, Referenceable
return _address;
}
- public String getName() {
+ public int getAddressType(){
+ return _addressType;
+ }
+
+ public void setAddressType(int addressType){
+ _addressType = addressType;
+ }
+
+ public String getAddressName() {
return _name;
}
@@ -670,6 +703,10 @@ public abstract class AMQDestination implements Destination, Referenceable
return _subject;
}
+ public void setSubject(String subject) {
+ _subject = subject;
+ }
+
public AddressOption getCreate() {
return _create;
}
@@ -681,49 +718,35 @@ public abstract class AMQDestination implements Destination, Referenceable
public AddressOption getDelete() {
return _delete;
}
-
- public String getFilter() {
- return _filter;
- }
-
- public FilterType getFilterType() {
- return _filterType;
- }
-
- public boolean isNoLocal() {
- return _isNoLocal;
- }
-
- public int getNodeType() {
- return _nodeType;
+
+ public Node getTargetNode()
+ {
+ return _targetNode;
}
- public QpidQueueOptions getQueueOptions() {
- return _queueOptions;
+ public void setTargetNode(Node node)
+ {
+ _targetNode = node;
}
- public List<Binding> getBindings() {
- return _bindings;
+ public Node getSourceNode()
+ {
+ return _sourceNode;
}
- public void addBinding(Binding binding) {
- this._bindings.add(binding);
- }
-
- public DestSyntax getDestSyntax() {
- return _destSyntax;
- }
-
- public QpidExchangeOptions getExchangeOptions() {
- return _exchangeOptions;
+ public void setSourceNode(Node node)
+ {
+ _sourceNode = node;
}
- public String getAlternateExchange() {
- return _alternateExchange;
+ public Link getSourceLink()
+ {
+ return _sourceLink;
}
- public void setAlternateExchange(String alternateExchange) {
- this._alternateExchange = alternateExchange;
+ public void setSourceLink(Link link)
+ {
+ _sourceLink = link;
}
public void setExchangeName(AMQShortString name)
@@ -750,37 +773,35 @@ public abstract class AMQDestination implements Destination, Referenceable
return Address.parse(str);
}
- private void getInfoFromAddress()
+ private void getInfoFromAddress() throws Exception
{
_name = _address.getName();
_subject = _address.getSubject();
- AddressHelper addrHelper = new AddressHelper(_address);
+ _addrHelper = new AddressHelper(_address);
- _create = addrHelper.getCreate() != null ?
- AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER;
+ _create = _addrHelper.getCreate() != null ?
+ AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER;
- _assert = addrHelper.getAssert() != null ?
- AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS;
+ _assert = _addrHelper.getAssert() != null ?
+ AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER;
- _delete = addrHelper.getDelete() != null ?
- AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER;
+ _delete = _addrHelper.getDelete() != null ?
+ AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER;
- _filter = addrHelper.getFilter();
- _isNoLocal = addrHelper.isNoLocal();
- _isDurable = addrHelper.isDurable();
- _isAutoDelete = addrHelper.isAutoDelete();
- _isExclusive = addrHelper.isExclusive();
- _browseOnly = addrHelper.isBrowseOnly();
-
- _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")?
- QUEUE_TYPE : TOPIC_TYPE;
-
- _alternateExchange = addrHelper.getAltExchange();
-
- _queueOptions = addrHelper.getQpidQueueOptions();
- _exchangeOptions = addrHelper.getQpidExchangeOptions();
- _bindings = addrHelper.getBindings();
+ _addressType = _addrHelper.getTargetNodeType();
+ _targetNode = _addrHelper.getTargetNode(_addressType);
+ _sourceNode = _addrHelper.getSourceNode(_addressType);
+ _sourceLink = _addrHelper.getLink();
+ }
+
+ // This method is needed if we didn't know the node type at the beginning.
+ // Therefore we have to query the broker to figure out the type.
+ // Once the type is known we look for the necessary properties.
+ public void rebuildTargetAndSourceNodes(int addressType)
+ {
+ _targetNode = _addrHelper.getTargetNode(addressType);
+ _sourceNode = _addrHelper.getSourceNode(addressType);
}
// ----- / new address syntax -----------
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 704dbf8bfc..8064ed1ae6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -23,8 +23,10 @@ import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -89,6 +91,9 @@ import java.util.Timer;
import java.util.TimerTask;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
/**
* This is a 0.10 Session
@@ -354,13 +359,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- for (Binding binding: destination.getBindings())
+ List<Binding> bindings = new ArrayList<Binding>();
+ bindings.addAll(destination.getSourceNode().getBindings());
+ bindings.addAll(destination.getTargetNode().getBindings());
+ for (Binding binding: bindings)
{
- _logger.debug("Binding queue : " + queueName.toString() +
+ String queue = binding.getQueue() == null?
+ queueName.asString(): binding.getQueue();
+ _logger.debug("Binding queue : " + queue +
" exchange: " + binding.getExchange() +
" using binding key " + binding.getBindingKey() +
" with args " + printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queueName.toString(),
+ getQpidSession().exchangeBind(queue,
binding.getExchange(),
binding.getBindingKey(),
binding.getArgs());
@@ -687,21 +697,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
queueName = amqd.getAMQQueueName();
}
- Map<String,Object> arguments = new HashMap<String,Object>();
- if (noLocal || amqd.isNoLocal())
- {
- arguments.put("no-local", true);
+ if (amqd.getDestSyntax() == DestSyntax.BURL)
+ {
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ if (noLocal)
+ {
+ arguments.put("no-local", true);
+ }
+
+ /*if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+ {
+ arguments.putAll(amqd.getta);
+ }*/
+
+ getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
+ amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ amqd.isDurable() ? Option.DURABLE : Option.NONE,
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
-
- if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+ else
{
- arguments.putAll(amqd.getQueueOptions());
+ QueueNode node = (QueueNode)amqd.getSourceNode();
+ getQpidSession().queueDeclare(queueName.toString(), "" ,
+ node.getDeclareArgs(),
+ node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ node.isDurable() ? Option.DURABLE : Option.NONE,
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
- getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments,
- amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
- amqd.isDurable() ? Option.DURABLE : Option.NONE,
- amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
if (!nowait)
{
@@ -1017,39 +1040,58 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
- public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
+ public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
{
boolean match = true;
- ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get();
- match = !result.getNotFound();
+ ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
+ match = !result.getNotFound();
- if (match && assertNode)
- {
- match = (result.getDurable() == dest.isDurable()) &&
- (dest.getExchangeClass() != null) &&
- (dest.getExchangeClass().asString().equals(result.getType())) &&
- (matchProps(result.getArguments(),dest.getQueueOptions()));
- }
if (match)
{
- dest.setExchangeClass(new AMQShortString(result.getType()));
+ if (assertNode)
+ {
+ match = (result.getDurable() == node.isDurable()) &&
+ (node.getExchangeType() != null &&
+ node.getExchangeType().equals(result.getType())) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (node.getExchangeType() != null)
+ {
+ // even if assert is false, better to verify this
+ match = node.getExchangeType().equals(result.getType());
+ if (!match)
+ {
+ _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() +
+ " actual " + result.getType());
+ }
+ }
+ else
+ {
+ _logger.debug("Setting Exchange type " + result.getType());
+ node.setExchangeType(result.getType());
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ }
}
return match;
}
- public boolean isQueueExist(AMQDestination dest,boolean assertNode)
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
{
boolean match = true;
- QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get();
- match = dest.getName().equals(result.getQueue());
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
if (match && assertNode)
{
- match = (result.getDurable() == dest.isDurable()) &&
- (result.getAutoDelete() == dest.isAutoDelete()) &&
- (result.getExclusive() == dest.isExclusive()) &&
- (matchProps(result.getArguments(),dest.getQueueOptions()));
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (match)
+ {
+ // should I use the queried details to update the local data structure.
}
return match;
@@ -1063,76 +1105,177 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
match = target.containsKey(key) &&
target.get(key).equals(source.get(key));
- if (!match) return match;
+ if (!match)
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Property given in address did not match with the args sent by the broker.");
+ buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
+ buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
+ _logger.debug(buf.toString());
+ return match;
+ }
}
return match;
}
+ @SuppressWarnings("deprecation")
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
boolean noWait) throws AMQException
{
- boolean noLocal = dest.isNoLocal();
boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
(isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
(!isConsumer && dest.getAssert() == AddressOption.SENDER);
+
+ boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
+ (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getCreate() == AddressOption.SENDER);
- if (isExchangeExist(dest,assertNode))
+
+ int type = resolveAddressType(dest);
+
+ switch (type)
{
- dest.setExchangeName(new AMQShortString(dest.getName()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- if (isConsumer)
+ case AMQDestination.QUEUE_TYPE:
{
- dest.setQueueName(null);
- dest.addBinding(new Binding(dest.getName(),
- dest.getSubject(),
- null));
+ if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+ {
+ setLegacyFiledsForQueueType(dest);
+ break;
+ }
+ else if(createNode)
+ {
+ setLegacyFiledsForQueueType(dest);
+ send0_10QueueDeclare(dest,null,false,noWait);
+ break;
+ }
}
+
+ case AMQDestination.TOPIC_TYPE:
+ {
+ if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+ {
+ setLegacyFiledsForTopicType(dest);
+ verifySubject(dest);
+ createSubscriptionQueue(dest);
+ break;
+ }
+ else if(createNode)
+ {
+ setLegacyFiledsForTopicType(dest);
+ verifySubject(dest);
+ sendExchangeDeclare(dest.getAddressName(),
+ dest.getExchangeClass().asString(),
+ dest.getTargetNode().getAlternateExchange(),
+ dest.getTargetNode().getDeclareArgs(),
+ false);
+ createSubscriptionQueue(dest);
+ break;
+ }
+ }
+
+ default:
+ throw new AMQException(
+ "The name '" + dest.getAddressName() +
+ "' supplied in the address doesn't resolve to an exchange or a queue");
}
- else if (isQueueExist(dest,assertNode))
- {
- dest.setQueueName(new AMQShortString(dest.getName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
- else if (dest.getCreate() == AddressOption.ALWAYS ||
- dest.getCreate() == AddressOption.RECEIVER && isConsumer ||
- dest.getCreate() == AddressOption.SENDER && !isConsumer)
+ }
+
+ int resolveAddressType(AMQDestination dest) throws AMQException
+ {
+ int type = dest.getAddressType();
+ String name = dest.getAddressName();
+ if (type != AMQDestination.UNKNOWN_TYPE)
+ {
+ return type;
+ }
+ else
+ {
+ ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get();
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name; treat it as a queue
+ type = AMQDestination.QUEUE_TYPE;
+ } else if (result.getExchangeNotFound()) {
+ //name refers to a queue
+ type = AMQDestination.QUEUE_TYPE;
+ } else if (result.getQueueNotFound()) {
+ //name refers to an exchange
+ type = AMQDestination.TOPIC_TYPE;
+ } else {
+ //both a queue and exchange exist for that name
+ throw new AMQException("Ambiguous address, please specify queue or topic as node type");
+ }
+ dest.rebuildTargetAndSourceNodes(type);
+ return type;
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private void verifySubject(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
{
- if (dest.getNodeType() == AMQDestination.QUEUE_TYPE)
+ if (dest.getExchangeClass() == ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
{
- dest.setQueueName(new AMQShortString(dest.getName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
+ dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY);
+ dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString());
}
- else
+ else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
{
- dest.setQueueName(null);
- dest.setExchangeName(new AMQShortString(dest.getName()));
- dest.setExchangeClass(dest.getExchangeClass() == null?
- ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass());
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- dest.addBinding(new Binding(dest.getName(),
- dest.getSubject(),
- null));
-
- sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(),
- dest.getAlternateExchange(), dest.getExchangeOptions(),false);
-
+ throw new AMQException("If sending to an exchange of type direct," +
+ " a valid subject must be specified");
}
-
- send0_10QueueDeclare(dest,null,noLocal,noWait);
}
- else
+ }
+
+ private void createSubscriptionQueue(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSourceNode() != null)
{
- throw new AMQException(
- "The name '" + dest.getName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
+ QueueNode node = (QueueNode)dest.getSourceNode();
+ if (dest.getQueueName() == null || !isQueueExist(dest,node,true))
+ {
+ // can name : my-queue be used in x-declare?
+ // if so set it to dest queue name
+ // if (dest.getQueueName() == null) { dest.setName(node.getName()) }
+ send0_10QueueDeclare(dest,null,false,false);
+ }
+ node.addBinding(new Binding(dest.getAddressName(),
+ dest.getQueueName(),// should have one by now
+ dest.getSubject(),
+ node.getDeclareArgs()));
}
+ else
+ {
+ send0_10QueueDeclare(dest,null,false,false);
+ dest.getTargetNode().addBinding(new Binding(dest.getAddressName(),
+ null,
+ dest.getSubject(),
+ null));
+ }
+ }
+
+ private void setLegacyFiledsForQueueType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(new AMQShortString(dest.getAddressName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+
+ private void setLegacyFiledsForTopicType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(null);
+ dest.setExchangeName(new AMQShortString(dest.getAddressName()));
+ ExchangeNode node = (ExchangeNode)dest.getTargetNode();
+ dest.setExchangeClass(node.getExchangeType() == null?
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
+ new AMQShortString(node.getExchangeType()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
/** This should be moved to a suitable utility class */
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 84bc4d596e..1d6dde7c29 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -21,10 +21,15 @@
package org.apache.qpid.client.messaging.address;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
import org.apache.qpid.configuration.Accessor;
import org.apache.qpid.configuration.Accessor.MapAccessor;
import org.apache.qpid.messaging.Address;
@@ -35,8 +40,11 @@ import org.apache.qpid.messaging.Address;
*/
public class AddressHelper
{
- public static final String NODE_PROPS = "node-properties";
- public static final String X_PROPS = "x-properties";
+ public static final String NODE = "node";
+ public static final String LINK = "link";
+ public static final String X_DECLARE = "x-declare";
+ public static final String X_BINDINGS = "x-bindings";
+ public static final String X_SUBSCRIBE = "x-subscribes";
public static final String CREATE = "create";
public static final String ASSERT = "assert";
public static final String DELETE = "delete";
@@ -49,22 +57,34 @@ public class AddressHelper
public static final String ALT_EXCHANGE = "alt-exchange";
public static final String BINDINGS = "bindings";
public static final String BROWSE_ONLY = "browse";
+ public static final String CAPACITY = "capacity";
+ public static final String NAME = "name";
+ public static final String EXCHANGE = "exchange";
+ public static final String QUEUE = "queue";
+ public static final String KEY = "key";
+ public static final String ARGUMENTS = "arguments";
private Address address;
private Accessor addressProps;
private Accessor nodeProps;
- private Accessor xProps;
+ private Accessor linkProps;
public AddressHelper(Address address)
{
this.address = address;
addressProps = new MapAccessor(address.getOptions());
Map node_props = address.getOptions() == null ||
- address.getOptions().get(NODE_PROPS) == null ?
- null : (Map)address.getOptions().get(NODE_PROPS);
- nodeProps = new MapAccessor(node_props);
- xProps = new MapAccessor(node_props == null || node_props.get(X_PROPS) == null?
- null: (Map)node_props.get(X_PROPS));
+ address.getOptions().get(NODE) == null ?
+ null : (Map)address.getOptions().get(NODE);
+
+ if (node_props != null) { nodeProps = new MapAccessor(node_props); }
+
+
+ Map link_props = address.getOptions() == null ||
+ address.getOptions().get(LINK) == null ?
+ null : (Map)address.getOptions().get(LINK);
+
+ if (link_props != null) { linkProps = new MapAccessor(link_props); }
}
public String getCreate()
@@ -82,135 +102,218 @@ public class AddressHelper
return addressProps.getString(DELETE);
}
- public String getFilter()
- {
- return addressProps.getString(FILTER);
- }
-
public boolean isNoLocal()
{
Boolean b = nodeProps.getBoolean(NO_LOCAL);
return b == null ? false : b ;
}
-
- public boolean isDurable()
- {
- Boolean b = nodeProps.getBoolean(DURABLE);
- return b == null ? false : b ;
- }
-
- public boolean isExclusive()
- {
- Boolean b = xProps.getBoolean(EXCLUSIVE);
- return b == null ? false : b ;
- }
-
- public boolean isAutoDelete()
- {
- Boolean b = xProps.getBoolean(AUTO_DELETE);
- return b == null ? false : b ;
- }
-
+
public boolean isBrowseOnly()
{
- Boolean b = xProps.getBoolean(BROWSE_ONLY);
+ Boolean b = nodeProps.getBoolean(BROWSE_ONLY);
return b == null ? false : b ;
}
-
- public String getNodeType()
- {
- return nodeProps.getString(TYPE);
- }
-
- public String getAltExchange()
- {
- return xProps.getString(ALT_EXCHANGE);
- }
-
- public QpidQueueOptions getQpidQueueOptions()
+
+ public QpidQueueOptions getQpidQueueOptions(MapAccessor args)
{
QpidQueueOptions options = new QpidQueueOptions();
- if (xProps.getInt(QpidQueueOptions.QPID_MAX_COUNT) != null)
+ if (args.getInt(QpidQueueOptions.QPID_MAX_COUNT) != null)
{
- options.setMaxCount(xProps.getInt(QpidQueueOptions.QPID_MAX_COUNT));
+ options.setMaxCount(args.getInt(QpidQueueOptions.QPID_MAX_COUNT));
}
- if (xProps.getInt(QpidQueueOptions.QPID_MAX_SIZE) != null)
+ if (args.getInt(QpidQueueOptions.QPID_MAX_SIZE) != null)
{
- options.setMaxSize(xProps.getInt(QpidQueueOptions.QPID_MAX_SIZE));
+ options.setMaxSize(args.getInt(QpidQueueOptions.QPID_MAX_SIZE));
}
- if (xProps.getInt(QpidQueueOptions.QPID_POLICY_TYPE) != null)
+ if (args.getInt(QpidQueueOptions.QPID_POLICY_TYPE) != null)
{
- options.setPolicyType(xProps.getString(QpidQueueOptions.QPID_POLICY_TYPE));
+ options.setPolicyType(args.getString(QpidQueueOptions.QPID_POLICY_TYPE));
}
- if (xProps.getInt(QpidQueueOptions.QPID_PERSIST_LAST_NODE) != null)
+ if (args.getInt(QpidQueueOptions.QPID_PERSIST_LAST_NODE) != null)
{
options.setPersistLastNode();
}
- if (xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null)
+ if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null)
{
- options.setOrderingPolicy(xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE));
- options.setLvqKey(xProps.getString(QpidQueueOptions.QPID_LVQ_KEY));
+ options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE));
+ options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY));
}
- else if (xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null)
+ else if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null)
{
- options.setOrderingPolicy(xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE));
- options.setLvqKey(xProps.getString(QpidQueueOptions.QPID_LVQ_KEY));
+ options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE));
+ options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY));
}
- if (xProps.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null)
+ if (args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null)
{
- options.setQueueEvents(xProps.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION));
+ options.setQueueEvents(args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION));
}
return options;
}
- public QpidExchangeOptions getQpidExchangeOptions()
+ public QpidExchangeOptions getQpidExchangeOptions(MapAccessor args)
{
QpidExchangeOptions options = new QpidExchangeOptions();
- if (xProps.getInt(QpidExchangeOptions.QPID_EXCLUSIVE_BINDING) != null)
+ if (args.getInt(QpidExchangeOptions.QPID_EXCLUSIVE_BINDING) != null)
{
options.setExclusiveBinding();
}
- if (xProps.getInt(QpidExchangeOptions.QPID_INITIAL_VALUE_EXCHANGE) != null)
+ if (args.getInt(QpidExchangeOptions.QPID_INITIAL_VALUE_EXCHANGE) != null)
{
options.setInitialValueExchange();
}
- if (xProps.getInt(QpidExchangeOptions.QPID_MSG_SEQUENCE) != null)
+ if (args.getInt(QpidExchangeOptions.QPID_MSG_SEQUENCE) != null)
{
options.setMessageSequencing();
}
return options;
}
- public List<Binding> getBindings()
+ @SuppressWarnings("unchecked")
+ public List<Binding> getBindings(Map props)
{
- List<Binding> bindings = new ArrayList<Binding>();
- if (address.getOptions() != null &&
- address.getOptions().get(NODE_PROPS) != null &&
- ((Map)address.getOptions().get(NODE_PROPS)).get(X_PROPS) != null)
+ List<Binding> bindings = new ArrayList<Binding>();
+ List<Map> bindingList = (List<Map>)props.get(X_BINDINGS);
+ if (bindingList != null)
{
- Map node_props = (Map)address.getOptions().get(NODE_PROPS);
- List<String> bindingList =
- (List<String>)((Map)node_props.get(X_PROPS)).get(BINDINGS);
- if (bindingList != null)
+ for (Map bindingMap: bindingList)
{
- for (String bindingStr: bindingList)
- {
- Address addr = Address.parse(bindingStr);
- Binding binding = new Binding(addr.getName(),
- addr.getSubject(),
- addr.getOptions());
- bindings.add(binding);
- }
+ Binding binding = new Binding((String)bindingMap.get(EXCHANGE),
+ (String)bindingMap.get(QUEUE),
+ (String)bindingMap.get(KEY),
+ bindingMap.get(ARGUMENTS) == null ?
+ Collections.EMPTY_MAP:
+ (Map<String,Object>)bindingMap.get(ARGUMENTS));
+ bindings.add(binding);
}
}
return bindings;
}
+
+ public Map getDeclareArgs(Map props)
+ {
+ if (props != null)
+ {
+ return (Map)props.get(X_DECLARE);
+ }
+ else
+ {
+ return Collections.EMPTY_MAP;
+ }
+ }
+
+ public int getTargetNodeType() throws Exception
+ {
+ if (nodeProps == null || nodeProps.getString(TYPE) == null)
+ {
+ // need to query and figure out
+ return AMQDestination.UNKNOWN_TYPE;
+ }
+ else if (nodeProps.getString(TYPE).equals("queue"))
+ {
+ return AMQDestination.QUEUE_TYPE;
+ }
+ else if ((nodeProps.getString(TYPE).equals("topic") ||
+ nodeProps.getString(TYPE).equals("direct") ||
+ nodeProps.getString(TYPE).equals("fanout") ||
+ nodeProps.getString(TYPE).equals("match") ||
+ nodeProps.getString(TYPE).equals("xml")) )
+ {
+ return AMQDestination.TOPIC_TYPE;
+ }
+ else
+ {
+ throw new Exception("unkown exchange type");
+ }
+ }
+
+ public Node getTargetNode(int addressType)
+ {
+ // target node here is the default exchange
+ if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
+ {
+ return new ExchangeNode();
+ }
+ else if (addressType == AMQDestination.TOPIC_TYPE)
+ {
+ Map node = (Map)address.getOptions().get(NODE);
+ return createExchangeNode(node);
+ }
+ else
+ {
+ // don't know yet
+ return null;
+ }
+ }
+
+ private Node createExchangeNode(Map parent)
+ {
+ Map declareArgs = getDeclareArgs(parent);
+ MapAccessor argsMap = new MapAccessor(declareArgs);
+ ExchangeNode node = new ExchangeNode();
+ node.setExchangeType(nodeProps.getString(TYPE));
+ node.setDeclareArgs(getQpidExchangeOptions(argsMap));
+ fillInCommonNodeArgs(node,parent,argsMap);
+ return node;
+ }
+
+ private Node createQueueNode(Map parent)
+ {
+ Map declareArgs = getDeclareArgs(parent);
+ MapAccessor argsMap = new MapAccessor(declareArgs);
+ QueueNode node = new QueueNode();
+ node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
+ node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null? false : argsMap.getBoolean(EXCLUSIVE));
+ node.setDeclareArgs(getQpidQueueOptions(argsMap));
+ fillInCommonNodeArgs(node,parent,argsMap);
+
+ return node;
+ }
+
+ private void fillInCommonNodeArgs(Node node,Map parent,MapAccessor argsMap)
+ {
+ node.setDurable(nodeProps.getBoolean(DURABLE) == null? false : nodeProps.getBoolean(DURABLE));
+ node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null? false : argsMap.getBoolean(AUTO_DELETE));
+ node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
+ node.setBindings(getBindings(parent));
+ }
+
+ public Node getSourceNode(int addressType)
+ {
+ if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
+ {
+ return createQueueNode((Map)address.getOptions().get(NODE));
+ }
+ if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
+ {
+ return createQueueNode((Map)address.getOptions().get(LINK));
+ }
+ else
+ {
+ // need to query the info
+ return new QueueNode();
+ }
+ }
+
+ public Link getLink()
+ {
+ Link link = new Link();
+ if (linkProps != null)
+ {
+ link.setDurable(linkProps.getBoolean(DURABLE));
+ link.setName(linkProps.getString(NAME));
+ link.setCapacity(linkProps.getInt(CAPACITY));
+ link.setFilter(linkProps.getString(FILTER));
+ // so far filter type not used
+ }
+
+ return link;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
new file mode 100644
index 0000000000..367191e74e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.messaging.address;
+
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+
+public class Link
+{
+ public enum FilterType { SQL92, XQUERY, SUBJECT }
+
+ protected String name;
+ protected String _filter;
+ protected FilterType _filterType = FilterType.SUBJECT;
+ protected boolean _isNoLocal;
+ protected boolean _isDurable;
+ protected int _capacity = 0;
+ protected Node node;
+
+ public Node getNode()
+ {
+ return node;
+ }
+
+ public void setNode(Node node)
+ {
+ this.node = node;
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(boolean durable)
+ {
+ _isDurable = durable;
+ }
+
+ public String getFilter()
+ {
+ return _filter;
+ }
+
+ public void setFilter(String filter)
+ {
+ this._filter = filter;
+ }
+
+ public FilterType getFilterType()
+ {
+ return _filterType;
+ }
+
+ public void setFilterType(FilterType type)
+ {
+ _filterType = type;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _isNoLocal;
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _isNoLocal = noLocal;
+ }
+
+ public int getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(int capacity)
+ {
+ this._capacity = capacity;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
new file mode 100644
index 0000000000..24686cab17
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.messaging.address;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQDestination.Binding;
+
+public abstract class Node
+{
+ protected int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ protected boolean _isDurable;
+ protected boolean _isAutoDelete;
+ protected String _alternateExchange;
+ protected List<Binding> _bindings = new ArrayList<Binding>();
+
+ public int getType()
+ {
+ return _nodeType;
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(boolean durable)
+ {
+ _isDurable = durable;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public void setAutoDelete(boolean autoDelete)
+ {
+ _isAutoDelete = autoDelete;
+ }
+
+ public String getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String altExchange)
+ {
+ _alternateExchange = altExchange;
+ }
+
+ public List<Binding> getBindings()
+ {
+ return _bindings;
+ }
+
+ public void setBindings(List<Binding> bindings)
+ {
+ _bindings = bindings;
+ }
+
+ public void addBinding(Binding binding) {
+ this._bindings.add(binding);
+ }
+
+ public abstract Map<String,Object> getDeclareArgs();
+
+ public static class QueueNode extends Node
+ {
+ protected boolean _isExclusive;
+ protected QpidQueueOptions _queueOptions = new QpidQueueOptions();
+
+ public QueueNode()
+ {
+ _nodeType = AMQDestination.QUEUE_TYPE;
+ }
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _queueOptions;
+ }
+
+ public void setDeclareArgs(QpidQueueOptions options)
+ {
+ _queueOptions = options;
+ }
+ }
+
+ public static class ExchangeNode extends Node
+ {
+ protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
+ protected String _exchangeType;
+
+ public ExchangeNode()
+ {
+ _nodeType = AMQDestination.TOPIC_TYPE;
+ }
+
+ public String getExchangeType()
+ {
+ return _exchangeType;
+ }
+
+ public void setExchangeType(String exchangeType)
+ {
+ _exchangeType = exchangeType;
+ }
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _exchangeOptions;
+ }
+
+ public void setDeclareArgs(QpidExchangeOptions options)
+ {
+ _exchangeOptions = options;
+ }
+ }
+
+ public static class UnknownNodeType extends Node
+ {
+ public Map<String,Object> getDeclareArgs()
+ {
+ return Collections.emptyMap();
+ }
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
index 123901b577..1989ade4ac 100644
--- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -62,4 +62,6 @@ public class ExchangeDefaults
/** Defines the identifying type name of fanout exchanges. */
public static final AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout");
+
+ public static final AMQShortString WILDCARD_ANY = new AMQShortString("*");
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 732a28553c..fbc33a037b 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -30,11 +30,15 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.test.utils.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import edu.emory.mathcs.backport.java.util.Collections;
+
public class AddressBasedDestinationTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
@@ -67,7 +71,7 @@ public class AddressBasedDestinationTest extends QpidTestCase
MessageProducer prod;
MessageConsumer cons;
- // default (create never, assert always) -------------------
+ // default (create never, assert never) -------------------
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1";
AMQDestination dest = new AMQAnyDestination(addr1);
@@ -92,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
// create always -------------------------------------------
@@ -101,10 +105,10 @@ public class AddressBasedDestinationTest extends QpidTestCase
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getName(),dest.getName(), dest.getQueueOptions()));
+ dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
// create receiver -----------------------------------------
addr1 = "ADDR:testQueue2; { create: receiver }";
@@ -120,16 +124,16 @@ public class AddressBasedDestinationTest extends QpidTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getName(),dest.getName(), dest.getQueueOptions()));
+ dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
@@ -155,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
@@ -171,14 +175,14 @@ public class AddressBasedDestinationTest extends QpidTestCase
"doesn't resolve to an exchange or a queue"));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getName(),dest.getName(), dest.getQueueOptions()));
+ dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
}
@@ -193,40 +197,46 @@ public class AddressBasedDestinationTest extends QpidTestCase
}
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- String addr = "ADDR:my-queue/hello; { " +
- "create: always, " +
- "node-properties: {" +
- "durable: true ," +
- "x-properties: { " +
- "auto-delete: true," +
- "'qpid.max_size': 1000," +
- "'qpid.max_count': 100," +
- " bindings: ['amq.direct/test', 'amq.fanout', 'amq.topic/a.#']" +
-
- "}" +
- "}" +
+ String addr = "ADDR:my-queue/hello; " +
+ "{" +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "durable: true ," +
+ "x-declare: " +
+ "{" +
+ "auto-delete: true," +
+ "'qpid.max_size': 1000," +
+ "'qpid.max_count': 100" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "{exchange : 'amq.fanout'}," +
+ "{exchange : 'amq.topic', key : 'a.#'}" +
+ "]," +
+
+ "}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
MessageConsumer cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getName(),dest.getName(), null));
+ dest.getAddressName(),dest.getAddressName(), null));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
- dest.getName(),"test", null));
+ dest.getAddressName(),"test", null));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
- dest.getName(),null, null));
+ dest.getAddressName(),null, null));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
- dest.getName(),"a.#", null));
+ dest.getAddressName(),"a.#", null));
}
@@ -239,11 +249,14 @@ public class AddressBasedDestinationTest extends QpidTestCase
}
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- String addr = "ADDR:my-exchange/hello; { " +
+ String addr = "ADDR:my-exchange/hello; " +
+ "{ " +
"create: always, " +
- "node-properties: {" +
+ "node: " +
+ "{" +
"type: topic, " +
- "x-properties: { " +
+ "x-declare: " +
+ "{ " +
"auto-delete: true," +
"'qpid.msg_sequence': 1," +
"'qpid.ive': 1," +
@@ -255,12 +268,12 @@ public class AddressBasedDestinationTest extends QpidTestCase
MessageConsumer cons = jmsSession.createConsumer(dest);
assertTrue("Exchange not created as expected",(
- (AMQSession_0_10)jmsSession).isExchangeExist(dest, true));
+ (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
- dest.getQueueName(),"hello", dest.getQueueOptions()));
+ dest.getQueueName(),"hello", Collections.emptyMap()));
}
@@ -274,43 +287,48 @@ public class AddressBasedDestinationTest extends QpidTestCase
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- String headersBinding = "'amq.match; {x-match: any, dep: sales, loc: CA}'";
-
- String addr = "ADDR:my-queue/hello; { " +
- "create: always, " +
- "node-properties: {" +
- "durable: true ," +
- "x-properties: { " +
- "auto-delete: true," +
- "'qpid.max_count': 100," +
- " bindings: ['amq.direct/test', 'amq.topic/a.#'," + headersBinding + "]" +
-
- "}" +
- "}" +
+ String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
+
+ String addr = "ADDR:my-queue/hello; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "durable: true ," +
+ "x-declare: " +
+ "{ " +
+ "auto-delete: true," +
+ "'qpid.max_count': 100" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "{exchange : 'amq.topic', key : 'a.#'}," +
+ headersBinding +
+ "]" +
+ "}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
MessageConsumer cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getName(),dest.getName(), null));
+ dest.getAddressName(),dest.getAddressName(), null));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
- dest.getName(),"test", null));
+ dest.getAddressName(),"test", null));
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
- dest.getName(),"a.#", null));
+ dest.getAddressName(),"a.#", null));
Address a = Address.parse(headersBinding);
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.match",
- dest.getName(),null, a.getOptions()));
+ dest.getAddressName(),null, a.getOptions()));
}
/*public void testBindQueueForXMLExchange() throws Exception