summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java165
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java295
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java265
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java106
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java162
5 files changed, 764 insertions, 229 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 1ed64e7890..2d7844944a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -32,6 +32,8 @@ import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
import org.apache.qpid.client.messaging.address.QpidQueueOptions;
import org.apache.qpid.configuration.ClientProperties;
@@ -119,27 +121,25 @@ public abstract class AMQDestination implements Destination, Referenceable
}
}
- public enum FilterType { SQL92, XQUERY, SUBJECT }
+
protected static DestSyntax defaultDestSyntax;
protected DestSyntax _destSyntax;
+ protected AddressHelper _addrHelper;
protected Address _address;
+ protected int _addressType = AMQDestination.UNKNOWN_TYPE;
protected String _name;
protected String _subject;
protected AddressOption _create = AddressOption.NEVER;
- protected AddressOption _assert = AddressOption.ALWAYS;
- protected AddressOption _delete = AddressOption.NEVER;
-
- protected String _filter;
- protected FilterType _filterType = FilterType.SUBJECT;
- protected boolean _isNoLocal;
- protected int _nodeType = QUEUE_TYPE;
- protected String _alternateExchange;
- protected QpidQueueOptions _queueOptions;
- protected QpidExchangeOptions _exchangeOptions;
- protected List<Binding> _bindings = new ArrayList<Binding>();
+ protected AddressOption _assert = AddressOption.NEVER;
+ protected AddressOption _delete = AddressOption.NEVER;
+
+ protected Node _targetNode;
+ protected Node _sourceNode;
+ protected Link _targetLink;
+ protected Link _sourceLink;
// ----- / Fields required to support new address syntax -------
static
@@ -149,7 +149,7 @@ public abstract class AMQDestination implements Destination, Referenceable
DestSyntax.BURL.toString()));
}
- protected AMQDestination(Address address)
+ protected AMQDestination(Address address) throws Exception
{
this._address = address;
getInfoFromAddress();
@@ -169,7 +169,16 @@ public abstract class AMQDestination implements Destination, Referenceable
{
_destSyntax = DestSyntax.ADDR;
this._address = createAddressFromString(str);
- getInfoFromAddress();
+ try
+ {
+ getInfoFromAddress();
+ }
+ catch(Exception e)
+ {
+ URISyntaxException ex = new URISyntaxException(str,"Error parsing address");
+ ex.initCause(e);
+ throw ex;
+ }
}
_logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax);
}
@@ -264,6 +273,11 @@ public abstract class AMQDestination implements Destination, Referenceable
_logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
}
+ public DestSyntax getDestSyntax()
+ {
+ return _destSyntax;
+ }
+
public AMQShortString getEncodedName()
{
if(_urlAsShortString == null)
@@ -629,15 +643,21 @@ public abstract class AMQDestination implements Destination, Referenceable
}
// ----- new address syntax -----------
+
public static class Binding
{
String exchange;
String bindingKey;
+ String queue;
Map<String,Object> args;
- public Binding(String exchange,String bindingKey,Map<String,Object> args)
+ public Binding(String exchange,
+ String queue,
+ String bindingKey,
+ Map<String,Object> args)
{
this.exchange = exchange;
+ this.queue = queue;
this.bindingKey = bindingKey;
this.args = args;
}
@@ -647,6 +667,11 @@ public abstract class AMQDestination implements Destination, Referenceable
return exchange;
}
+ public String getQueue()
+ {
+ return queue;
+ }
+
public String getBindingKey()
{
return bindingKey;
@@ -662,7 +687,15 @@ public abstract class AMQDestination implements Destination, Referenceable
return _address;
}
- public String getName() {
+ public int getAddressType(){
+ return _addressType;
+ }
+
+ public void setAddressType(int addressType){
+ _addressType = addressType;
+ }
+
+ public String getAddressName() {
return _name;
}
@@ -670,6 +703,10 @@ public abstract class AMQDestination implements Destination, Referenceable
return _subject;
}
+ public void setSubject(String subject) {
+ _subject = subject;
+ }
+
public AddressOption getCreate() {
return _create;
}
@@ -681,49 +718,35 @@ public abstract class AMQDestination implements Destination, Referenceable
public AddressOption getDelete() {
return _delete;
}
-
- public String getFilter() {
- return _filter;
- }
-
- public FilterType getFilterType() {
- return _filterType;
- }
-
- public boolean isNoLocal() {
- return _isNoLocal;
- }
-
- public int getNodeType() {
- return _nodeType;
+
+ public Node getTargetNode()
+ {
+ return _targetNode;
}
- public QpidQueueOptions getQueueOptions() {
- return _queueOptions;
+ public void setTargetNode(Node node)
+ {
+ _targetNode = node;
}
- public List<Binding> getBindings() {
- return _bindings;
+ public Node getSourceNode()
+ {
+ return _sourceNode;
}
- public void addBinding(Binding binding) {
- this._bindings.add(binding);
- }
-
- public DestSyntax getDestSyntax() {
- return _destSyntax;
- }
-
- public QpidExchangeOptions getExchangeOptions() {
- return _exchangeOptions;
+ public void setSourceNode(Node node)
+ {
+ _sourceNode = node;
}
- public String getAlternateExchange() {
- return _alternateExchange;
+ public Link getSourceLink()
+ {
+ return _sourceLink;
}
- public void setAlternateExchange(String alternateExchange) {
- this._alternateExchange = alternateExchange;
+ public void setSourceLink(Link link)
+ {
+ _sourceLink = link;
}
public void setExchangeName(AMQShortString name)
@@ -750,37 +773,35 @@ public abstract class AMQDestination implements Destination, Referenceable
return Address.parse(str);
}
- private void getInfoFromAddress()
+ private void getInfoFromAddress() throws Exception
{
_name = _address.getName();
_subject = _address.getSubject();
- AddressHelper addrHelper = new AddressHelper(_address);
+ _addrHelper = new AddressHelper(_address);
- _create = addrHelper.getCreate() != null ?
- AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER;
+ _create = _addrHelper.getCreate() != null ?
+ AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER;
- _assert = addrHelper.getAssert() != null ?
- AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS;
+ _assert = _addrHelper.getAssert() != null ?
+ AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER;
- _delete = addrHelper.getDelete() != null ?
- AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER;
+ _delete = _addrHelper.getDelete() != null ?
+ AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER;
- _filter = addrHelper.getFilter();
- _isNoLocal = addrHelper.isNoLocal();
- _isDurable = addrHelper.isDurable();
- _isAutoDelete = addrHelper.isAutoDelete();
- _isExclusive = addrHelper.isExclusive();
- _browseOnly = addrHelper.isBrowseOnly();
-
- _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")?
- QUEUE_TYPE : TOPIC_TYPE;
-
- _alternateExchange = addrHelper.getAltExchange();
-
- _queueOptions = addrHelper.getQpidQueueOptions();
- _exchangeOptions = addrHelper.getQpidExchangeOptions();
- _bindings = addrHelper.getBindings();
+ _addressType = _addrHelper.getTargetNodeType();
+ _targetNode = _addrHelper.getTargetNode(_addressType);
+ _sourceNode = _addrHelper.getSourceNode(_addressType);
+ _sourceLink = _addrHelper.getLink();
+ }
+
+ // This method is needed if we didn't know the node type at the beginning.
+ // Therefore we have to query the broker to figure out the type.
+ // Once the type is known we look for the necessary properties.
+ public void rebuildTargetAndSourceNodes(int addressType)
+ {
+ _targetNode = _addrHelper.getTargetNode(addressType);
+ _sourceNode = _addrHelper.getSourceNode(addressType);
}
// ----- / new address syntax -----------
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 704dbf8bfc..8064ed1ae6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -23,8 +23,10 @@ import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -89,6 +91,9 @@ import java.util.Timer;
import java.util.TimerTask;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.messaging.address.Node;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
/**
* This is a 0.10 Session
@@ -354,13 +359,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- for (Binding binding: destination.getBindings())
+ List<Binding> bindings = new ArrayList<Binding>();
+ bindings.addAll(destination.getSourceNode().getBindings());
+ bindings.addAll(destination.getTargetNode().getBindings());
+ for (Binding binding: bindings)
{
- _logger.debug("Binding queue : " + queueName.toString() +
+ String queue = binding.getQueue() == null?
+ queueName.asString(): binding.getQueue();
+ _logger.debug("Binding queue : " + queue +
" exchange: " + binding.getExchange() +
" using binding key " + binding.getBindingKey() +
" with args " + printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queueName.toString(),
+ getQpidSession().exchangeBind(queue,
binding.getExchange(),
binding.getBindingKey(),
binding.getArgs());
@@ -687,21 +697,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
queueName = amqd.getAMQQueueName();
}
- Map<String,Object> arguments = new HashMap<String,Object>();
- if (noLocal || amqd.isNoLocal())
- {
- arguments.put("no-local", true);
+ if (amqd.getDestSyntax() == DestSyntax.BURL)
+ {
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ if (noLocal)
+ {
+ arguments.put("no-local", true);
+ }
+
+ /*if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+ {
+ arguments.putAll(amqd.getta);
+ }*/
+
+ getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
+ amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ amqd.isDurable() ? Option.DURABLE : Option.NONE,
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
-
- if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+ else
{
- arguments.putAll(amqd.getQueueOptions());
+ QueueNode node = (QueueNode)amqd.getSourceNode();
+ getQpidSession().queueDeclare(queueName.toString(), "" ,
+ node.getDeclareArgs(),
+ node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+ node.isDurable() ? Option.DURABLE : Option.NONE,
+ node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
- getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments,
- amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
- amqd.isDurable() ? Option.DURABLE : Option.NONE,
- amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
if (!nowait)
{
@@ -1017,39 +1040,58 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
- public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
+ public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode)
{
boolean match = true;
- ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get();
- match = !result.getNotFound();
+ ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
+ match = !result.getNotFound();
- if (match && assertNode)
- {
- match = (result.getDurable() == dest.isDurable()) &&
- (dest.getExchangeClass() != null) &&
- (dest.getExchangeClass().asString().equals(result.getType())) &&
- (matchProps(result.getArguments(),dest.getQueueOptions()));
- }
if (match)
{
- dest.setExchangeClass(new AMQShortString(result.getType()));
+ if (assertNode)
+ {
+ match = (result.getDurable() == node.isDurable()) &&
+ (node.getExchangeType() != null &&
+ node.getExchangeType().equals(result.getType())) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (node.getExchangeType() != null)
+ {
+ // even if assert is false, better to verify this
+ match = node.getExchangeType().equals(result.getType());
+ if (!match)
+ {
+ _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() +
+ " actual " + result.getType());
+ }
+ }
+ else
+ {
+ _logger.debug("Setting Exchange type " + result.getType());
+ node.setExchangeType(result.getType());
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ }
}
return match;
}
- public boolean isQueueExist(AMQDestination dest,boolean assertNode)
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
{
boolean match = true;
- QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get();
- match = dest.getName().equals(result.getQueue());
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
if (match && assertNode)
{
- match = (result.getDurable() == dest.isDurable()) &&
- (result.getAutoDelete() == dest.isAutoDelete()) &&
- (result.getExclusive() == dest.isExclusive()) &&
- (matchProps(result.getArguments(),dest.getQueueOptions()));
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (match)
+ {
+ // should I use the queried details to update the local data structure.
}
return match;
@@ -1063,76 +1105,177 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
match = target.containsKey(key) &&
target.get(key).equals(source.get(key));
- if (!match) return match;
+ if (!match)
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Property given in address did not match with the args sent by the broker.");
+ buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, ");
+ buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }");
+ _logger.debug(buf.toString());
+ return match;
+ }
}
return match;
}
+ @SuppressWarnings("deprecation")
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
boolean noWait) throws AMQException
{
- boolean noLocal = dest.isNoLocal();
boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
(isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
(!isConsumer && dest.getAssert() == AddressOption.SENDER);
+
+ boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
+ (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getCreate() == AddressOption.SENDER);
- if (isExchangeExist(dest,assertNode))
+
+ int type = resolveAddressType(dest);
+
+ switch (type)
{
- dest.setExchangeName(new AMQShortString(dest.getName()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- if (isConsumer)
+ case AMQDestination.QUEUE_TYPE:
{
- dest.setQueueName(null);
- dest.addBinding(new Binding(dest.getName(),
- dest.getSubject(),
- null));
+ if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode))
+ {
+ setLegacyFiledsForQueueType(dest);
+ break;
+ }
+ else if(createNode)
+ {
+ setLegacyFiledsForQueueType(dest);
+ send0_10QueueDeclare(dest,null,false,noWait);
+ break;
+ }
}
+
+ case AMQDestination.TOPIC_TYPE:
+ {
+ if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode))
+ {
+ setLegacyFiledsForTopicType(dest);
+ verifySubject(dest);
+ createSubscriptionQueue(dest);
+ break;
+ }
+ else if(createNode)
+ {
+ setLegacyFiledsForTopicType(dest);
+ verifySubject(dest);
+ sendExchangeDeclare(dest.getAddressName(),
+ dest.getExchangeClass().asString(),
+ dest.getTargetNode().getAlternateExchange(),
+ dest.getTargetNode().getDeclareArgs(),
+ false);
+ createSubscriptionQueue(dest);
+ break;
+ }
+ }
+
+ default:
+ throw new AMQException(
+ "The name '" + dest.getAddressName() +
+ "' supplied in the address doesn't resolve to an exchange or a queue");
}
- else if (isQueueExist(dest,assertNode))
- {
- dest.setQueueName(new AMQShortString(dest.getName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
- else if (dest.getCreate() == AddressOption.ALWAYS ||
- dest.getCreate() == AddressOption.RECEIVER && isConsumer ||
- dest.getCreate() == AddressOption.SENDER && !isConsumer)
+ }
+
+ int resolveAddressType(AMQDestination dest) throws AMQException
+ {
+ int type = dest.getAddressType();
+ String name = dest.getAddressName();
+ if (type != AMQDestination.UNKNOWN_TYPE)
+ {
+ return type;
+ }
+ else
+ {
+ ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get();
+ if (result.getQueueNotFound() && result.getExchangeNotFound()) {
+ //neither a queue nor an exchange exists with that name; treat it as a queue
+ type = AMQDestination.QUEUE_TYPE;
+ } else if (result.getExchangeNotFound()) {
+ //name refers to a queue
+ type = AMQDestination.QUEUE_TYPE;
+ } else if (result.getQueueNotFound()) {
+ //name refers to an exchange
+ type = AMQDestination.TOPIC_TYPE;
+ } else {
+ //both a queue and exchange exist for that name
+ throw new AMQException("Ambiguous address, please specify queue or topic as node type");
+ }
+ dest.rebuildTargetAndSourceNodes(type);
+ return type;
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private void verifySubject(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
{
- if (dest.getNodeType() == AMQDestination.QUEUE_TYPE)
+ if (dest.getExchangeClass() == ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
{
- dest.setQueueName(new AMQShortString(dest.getName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
+ dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY);
+ dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString());
}
- else
+ else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
{
- dest.setQueueName(null);
- dest.setExchangeName(new AMQShortString(dest.getName()));
- dest.setExchangeClass(dest.getExchangeClass() == null?
- ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass());
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- dest.addBinding(new Binding(dest.getName(),
- dest.getSubject(),
- null));
-
- sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(),
- dest.getAlternateExchange(), dest.getExchangeOptions(),false);
-
+ throw new AMQException("If sending to an exchange of type direct," +
+ " a valid subject must be specified");
}
-
- send0_10QueueDeclare(dest,null,noLocal,noWait);
}
- else
+ }
+
+ private void createSubscriptionQueue(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSourceNode() != null)
{
- throw new AMQException(
- "The name '" + dest.getName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
+ QueueNode node = (QueueNode)dest.getSourceNode();
+ if (dest.getQueueName() == null || !isQueueExist(dest,node,true))
+ {
+ // can name : my-queue be used in x-declare?
+ // if so set it to dest queue name
+ // if (dest.getQueueName() == null) { dest.setName(node.getName()) }
+ send0_10QueueDeclare(dest,null,false,false);
+ }
+ node.addBinding(new Binding(dest.getAddressName(),
+ dest.getQueueName(),// should have one by now
+ dest.getSubject(),
+ node.getDeclareArgs()));
}
+ else
+ {
+ send0_10QueueDeclare(dest,null,false,false);
+ dest.getTargetNode().addBinding(new Binding(dest.getAddressName(),
+ null,
+ dest.getSubject(),
+ null));
+ }
+ }
+
+ private void setLegacyFiledsForQueueType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(new AMQShortString(dest.getAddressName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+
+ private void setLegacyFiledsForTopicType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(null);
+ dest.setExchangeName(new AMQShortString(dest.getAddressName()));
+ ExchangeNode node = (ExchangeNode)dest.getTargetNode();
+ dest.setExchangeClass(node.getExchangeType() == null?
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS:
+ new AMQShortString(node.getExchangeType()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
/** This should be moved to a suitable utility class */
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 84bc4d596e..1d6dde7c29 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -21,10 +21,15 @@
package org.apache.qpid.client.messaging.address;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
import org.apache.qpid.configuration.Accessor;
import org.apache.qpid.configuration.Accessor.MapAccessor;
import org.apache.qpid.messaging.Address;
@@ -35,8 +40,11 @@ import org.apache.qpid.messaging.Address;
*/
public class AddressHelper
{
- public static final String NODE_PROPS = "node-properties";
- public static final String X_PROPS = "x-properties";
+ public static final String NODE = "node";
+ public static final String LINK = "link";
+ public static final String X_DECLARE = "x-declare";
+ public static final String X_BINDINGS = "x-bindings";
+ public static final String X_SUBSCRIBE = "x-subscribes";
public static final String CREATE = "create";
public static final String ASSERT = "assert";
public static final String DELETE = "delete";
@@ -49,22 +57,34 @@ public class AddressHelper
public static final String ALT_EXCHANGE = "alt-exchange";
public static final String BINDINGS = "bindings";
public static final String BROWSE_ONLY = "browse";
+ public static final String CAPACITY = "capacity";
+ public static final String NAME = "name";
+ public static final String EXCHANGE = "exchange";
+ public static final String QUEUE = "queue";
+ public static final String KEY = "key";
+ public static final String ARGUMENTS = "arguments";
private Address address;
private Accessor addressProps;
private Accessor nodeProps;
- private Accessor xProps;
+ private Accessor linkProps;
public AddressHelper(Address address)
{
this.address = address;
addressProps = new MapAccessor(address.getOptions());
Map node_props = address.getOptions() == null ||
- address.getOptions().get(NODE_PROPS) == null ?
- null : (Map)address.getOptions().get(NODE_PROPS);
- nodeProps = new MapAccessor(node_props);
- xProps = new MapAccessor(node_props == null || node_props.get(X_PROPS) == null?
- null: (Map)node_props.get(X_PROPS));
+ address.getOptions().get(NODE) == null ?
+ null : (Map)address.getOptions().get(NODE);
+
+ if (node_props != null) { nodeProps = new MapAccessor(node_props); }
+
+
+ Map link_props = address.getOptions() == null ||
+ address.getOptions().get(LINK) == null ?
+ null : (Map)address.getOptions().get(LINK);
+
+ if (link_props != null) { linkProps = new MapAccessor(link_props); }
}
public String getCreate()
@@ -82,135 +102,218 @@ public class AddressHelper
return addressProps.getString(DELETE);
}
- public String getFilter()
- {
- return addressProps.getString(FILTER);
- }
-
public boolean isNoLocal()
{
Boolean b = nodeProps.getBoolean(NO_LOCAL);
return b == null ? false : b ;
}
-
- public boolean isDurable()
- {
- Boolean b = nodeProps.getBoolean(DURABLE);
- return b == null ? false : b ;
- }
-
- public boolean isExclusive()
- {
- Boolean b = xProps.getBoolean(EXCLUSIVE);
- return b == null ? false : b ;
- }
-
- public boolean isAutoDelete()
- {
- Boolean b = xProps.getBoolean(AUTO_DELETE);
- return b == null ? false : b ;
- }
-
+
public boolean isBrowseOnly()
{
- Boolean b = xProps.getBoolean(BROWSE_ONLY);
+ Boolean b = nodeProps.getBoolean(BROWSE_ONLY);
return b == null ? false : b ;
}
-
- public String getNodeType()
- {
- return nodeProps.getString(TYPE);
- }
-
- public String getAltExchange()
- {
- return xProps.getString(ALT_EXCHANGE);
- }
-
- public QpidQueueOptions getQpidQueueOptions()
+
+ public QpidQueueOptions getQpidQueueOptions(MapAccessor args)
{
QpidQueueOptions options = new QpidQueueOptions();
- if (xProps.getInt(QpidQueueOptions.QPID_MAX_COUNT) != null)
+ if (args.getInt(QpidQueueOptions.QPID_MAX_COUNT) != null)
{
- options.setMaxCount(xProps.getInt(QpidQueueOptions.QPID_MAX_COUNT));
+ options.setMaxCount(args.getInt(QpidQueueOptions.QPID_MAX_COUNT));
}
- if (xProps.getInt(QpidQueueOptions.QPID_MAX_SIZE) != null)
+ if (args.getInt(QpidQueueOptions.QPID_MAX_SIZE) != null)
{
- options.setMaxSize(xProps.getInt(QpidQueueOptions.QPID_MAX_SIZE));
+ options.setMaxSize(args.getInt(QpidQueueOptions.QPID_MAX_SIZE));
}
- if (xProps.getInt(QpidQueueOptions.QPID_POLICY_TYPE) != null)
+ if (args.getInt(QpidQueueOptions.QPID_POLICY_TYPE) != null)
{
- options.setPolicyType(xProps.getString(QpidQueueOptions.QPID_POLICY_TYPE));
+ options.setPolicyType(args.getString(QpidQueueOptions.QPID_POLICY_TYPE));
}
- if (xProps.getInt(QpidQueueOptions.QPID_PERSIST_LAST_NODE) != null)
+ if (args.getInt(QpidQueueOptions.QPID_PERSIST_LAST_NODE) != null)
{
options.setPersistLastNode();
}
- if (xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null)
+ if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null)
{
- options.setOrderingPolicy(xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE));
- options.setLvqKey(xProps.getString(QpidQueueOptions.QPID_LVQ_KEY));
+ options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE));
+ options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY));
}
- else if (xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null)
+ else if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null)
{
- options.setOrderingPolicy(xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE));
- options.setLvqKey(xProps.getString(QpidQueueOptions.QPID_LVQ_KEY));
+ options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE));
+ options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY));
}
- if (xProps.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null)
+ if (args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null)
{
- options.setQueueEvents(xProps.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION));
+ options.setQueueEvents(args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION));
}
return options;
}
- public QpidExchangeOptions getQpidExchangeOptions()
+ public QpidExchangeOptions getQpidExchangeOptions(MapAccessor args)
{
QpidExchangeOptions options = new QpidExchangeOptions();
- if (xProps.getInt(QpidExchangeOptions.QPID_EXCLUSIVE_BINDING) != null)
+ if (args.getInt(QpidExchangeOptions.QPID_EXCLUSIVE_BINDING) != null)
{
options.setExclusiveBinding();
}
- if (xProps.getInt(QpidExchangeOptions.QPID_INITIAL_VALUE_EXCHANGE) != null)
+ if (args.getInt(QpidExchangeOptions.QPID_INITIAL_VALUE_EXCHANGE) != null)
{
options.setInitialValueExchange();
}
- if (xProps.getInt(QpidExchangeOptions.QPID_MSG_SEQUENCE) != null)
+ if (args.getInt(QpidExchangeOptions.QPID_MSG_SEQUENCE) != null)
{
options.setMessageSequencing();
}
return options;
}
- public List<Binding> getBindings()
+ @SuppressWarnings("unchecked")
+ public List<Binding> getBindings(Map props)
{
- List<Binding> bindings = new ArrayList<Binding>();
- if (address.getOptions() != null &&
- address.getOptions().get(NODE_PROPS) != null &&
- ((Map)address.getOptions().get(NODE_PROPS)).get(X_PROPS) != null)
+ List<Binding> bindings = new ArrayList<Binding>();
+ List<Map> bindingList = (List<Map>)props.get(X_BINDINGS);
+ if (bindingList != null)
{
- Map node_props = (Map)address.getOptions().get(NODE_PROPS);
- List<String> bindingList =
- (List<String>)((Map)node_props.get(X_PROPS)).get(BINDINGS);
- if (bindingList != null)
+ for (Map bindingMap: bindingList)
{
- for (String bindingStr: bindingList)
- {
- Address addr = Address.parse(bindingStr);
- Binding binding = new Binding(addr.getName(),
- addr.getSubject(),
- addr.getOptions());
- bindings.add(binding);
- }
+ Binding binding = new Binding((String)bindingMap.get(EXCHANGE),
+ (String)bindingMap.get(QUEUE),
+ (String)bindingMap.get(KEY),
+ bindingMap.get(ARGUMENTS) == null ?
+ Collections.EMPTY_MAP:
+ (Map<String,Object>)bindingMap.get(ARGUMENTS));
+ bindings.add(binding);
}
}
return bindings;
}
+
+ public Map getDeclareArgs(Map props)
+ {
+ if (props != null)
+ {
+ return (Map)props.get(X_DECLARE);
+ }
+ else
+ {
+ return Collections.EMPTY_MAP;
+ }
+ }
+
+ public int getTargetNodeType() throws Exception
+ {
+ if (nodeProps == null || nodeProps.getString(TYPE) == null)
+ {
+ // need to query and figure out
+ return AMQDestination.UNKNOWN_TYPE;
+ }
+ else if (nodeProps.getString(TYPE).equals("queue"))
+ {
+ return AMQDestination.QUEUE_TYPE;
+ }
+ else if ((nodeProps.getString(TYPE).equals("topic") ||
+ nodeProps.getString(TYPE).equals("direct") ||
+ nodeProps.getString(TYPE).equals("fanout") ||
+ nodeProps.getString(TYPE).equals("match") ||
+ nodeProps.getString(TYPE).equals("xml")) )
+ {
+ return AMQDestination.TOPIC_TYPE;
+ }
+ else
+ {
+ throw new Exception("unkown exchange type");
+ }
+ }
+
+ public Node getTargetNode(int addressType)
+ {
+ // target node here is the default exchange
+ if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE)
+ {
+ return new ExchangeNode();
+ }
+ else if (addressType == AMQDestination.TOPIC_TYPE)
+ {
+ Map node = (Map)address.getOptions().get(NODE);
+ return createExchangeNode(node);
+ }
+ else
+ {
+ // don't know yet
+ return null;
+ }
+ }
+
+ private Node createExchangeNode(Map parent)
+ {
+ Map declareArgs = getDeclareArgs(parent);
+ MapAccessor argsMap = new MapAccessor(declareArgs);
+ ExchangeNode node = new ExchangeNode();
+ node.setExchangeType(nodeProps.getString(TYPE));
+ node.setDeclareArgs(getQpidExchangeOptions(argsMap));
+ fillInCommonNodeArgs(node,parent,argsMap);
+ return node;
+ }
+
+ private Node createQueueNode(Map parent)
+ {
+ Map declareArgs = getDeclareArgs(parent);
+ MapAccessor argsMap = new MapAccessor(declareArgs);
+ QueueNode node = new QueueNode();
+ node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
+ node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null? false : argsMap.getBoolean(EXCLUSIVE));
+ node.setDeclareArgs(getQpidQueueOptions(argsMap));
+ fillInCommonNodeArgs(node,parent,argsMap);
+
+ return node;
+ }
+
+ private void fillInCommonNodeArgs(Node node,Map parent,MapAccessor argsMap)
+ {
+ node.setDurable(nodeProps.getBoolean(DURABLE) == null? false : nodeProps.getBoolean(DURABLE));
+ node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null? false : argsMap.getBoolean(AUTO_DELETE));
+ node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE));
+ node.setBindings(getBindings(parent));
+ }
+
+ public Node getSourceNode(int addressType)
+ {
+ if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null)
+ {
+ return createQueueNode((Map)address.getOptions().get(NODE));
+ }
+ if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null)
+ {
+ return createQueueNode((Map)address.getOptions().get(LINK));
+ }
+ else
+ {
+ // need to query the info
+ return new QueueNode();
+ }
+ }
+
+ public Link getLink()
+ {
+ Link link = new Link();
+ if (linkProps != null)
+ {
+ link.setDurable(linkProps.getBoolean(DURABLE));
+ link.setName(linkProps.getString(NAME));
+ link.setCapacity(linkProps.getInt(CAPACITY));
+ link.setFilter(linkProps.getString(FILTER));
+ // so far filter type not used
+ }
+
+ return link;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
new file mode 100644
index 0000000000..367191e74e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.messaging.address;
+
+import org.apache.qpid.client.messaging.address.Node.QueueNode;
+
+public class Link
+{
+ public enum FilterType { SQL92, XQUERY, SUBJECT }
+
+ protected String name;
+ protected String _filter;
+ protected FilterType _filterType = FilterType.SUBJECT;
+ protected boolean _isNoLocal;
+ protected boolean _isDurable;
+ protected int _capacity = 0;
+ protected Node node;
+
+ public Node getNode()
+ {
+ return node;
+ }
+
+ public void setNode(Node node)
+ {
+ this.node = node;
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(boolean durable)
+ {
+ _isDurable = durable;
+ }
+
+ public String getFilter()
+ {
+ return _filter;
+ }
+
+ public void setFilter(String filter)
+ {
+ this._filter = filter;
+ }
+
+ public FilterType getFilterType()
+ {
+ return _filterType;
+ }
+
+ public void setFilterType(FilterType type)
+ {
+ _filterType = type;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _isNoLocal;
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _isNoLocal = noLocal;
+ }
+
+ public int getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(int capacity)
+ {
+ this._capacity = capacity;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
new file mode 100644
index 0000000000..24686cab17
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.messaging.address;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQDestination.Binding;
+
+public abstract class Node
+{
+ protected int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ protected boolean _isDurable;
+ protected boolean _isAutoDelete;
+ protected String _alternateExchange;
+ protected List<Binding> _bindings = new ArrayList<Binding>();
+
+ public int getType()
+ {
+ return _nodeType;
+ }
+
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ public void setDurable(boolean durable)
+ {
+ _isDurable = durable;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public void setAutoDelete(boolean autoDelete)
+ {
+ _isAutoDelete = autoDelete;
+ }
+
+ public String getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String altExchange)
+ {
+ _alternateExchange = altExchange;
+ }
+
+ public List<Binding> getBindings()
+ {
+ return _bindings;
+ }
+
+ public void setBindings(List<Binding> bindings)
+ {
+ _bindings = bindings;
+ }
+
+ public void addBinding(Binding binding) {
+ this._bindings.add(binding);
+ }
+
+ public abstract Map<String,Object> getDeclareArgs();
+
+ public static class QueueNode extends Node
+ {
+ protected boolean _isExclusive;
+ protected QpidQueueOptions _queueOptions = new QpidQueueOptions();
+
+ public QueueNode()
+ {
+ _nodeType = AMQDestination.QUEUE_TYPE;
+ }
+
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _queueOptions;
+ }
+
+ public void setDeclareArgs(QpidQueueOptions options)
+ {
+ _queueOptions = options;
+ }
+ }
+
+ public static class ExchangeNode extends Node
+ {
+ protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions();
+ protected String _exchangeType;
+
+ public ExchangeNode()
+ {
+ _nodeType = AMQDestination.TOPIC_TYPE;
+ }
+
+ public String getExchangeType()
+ {
+ return _exchangeType;
+ }
+
+ public void setExchangeType(String exchangeType)
+ {
+ _exchangeType = exchangeType;
+ }
+
+ public Map<String,Object> getDeclareArgs()
+ {
+ return _exchangeOptions;
+ }
+
+ public void setDeclareArgs(QpidExchangeOptions options)
+ {
+ _exchangeOptions = options;
+ }
+ }
+
+ public static class UnknownNodeType extends Node
+ {
+ public Map<String,Object> getDeclareArgs()
+ {
+ return Collections.emptyMap();
+ }
+ }
+}