diff options
Diffstat (limited to 'java/client')
5 files changed, 764 insertions, 229 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 1ed64e7890..2d7844944a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -32,6 +32,8 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.messaging.address.QpidExchangeOptions; import org.apache.qpid.client.messaging.address.QpidQueueOptions; import org.apache.qpid.configuration.ClientProperties; @@ -119,27 +121,25 @@ public abstract class AMQDestination implements Destination, Referenceable } } - public enum FilterType { SQL92, XQUERY, SUBJECT } + protected static DestSyntax defaultDestSyntax; protected DestSyntax _destSyntax; + protected AddressHelper _addrHelper; protected Address _address; + protected int _addressType = AMQDestination.UNKNOWN_TYPE; protected String _name; protected String _subject; protected AddressOption _create = AddressOption.NEVER; - protected AddressOption _assert = AddressOption.ALWAYS; - protected AddressOption _delete = AddressOption.NEVER; - - protected String _filter; - protected FilterType _filterType = FilterType.SUBJECT; - protected boolean _isNoLocal; - protected int _nodeType = QUEUE_TYPE; - protected String _alternateExchange; - protected QpidQueueOptions _queueOptions; - protected QpidExchangeOptions _exchangeOptions; - protected List<Binding> _bindings = new ArrayList<Binding>(); + protected AddressOption _assert = AddressOption.NEVER; + protected AddressOption _delete = AddressOption.NEVER; + + protected Node _targetNode; + protected Node _sourceNode; + protected Link _targetLink; + protected Link _sourceLink; // ----- / Fields required to support new address syntax ------- static @@ -149,7 +149,7 @@ public abstract class AMQDestination implements Destination, Referenceable DestSyntax.BURL.toString())); } - protected AMQDestination(Address address) + protected AMQDestination(Address address) throws Exception { this._address = address; getInfoFromAddress(); @@ -169,7 +169,16 @@ public abstract class AMQDestination implements Destination, Referenceable { _destSyntax = DestSyntax.ADDR; this._address = createAddressFromString(str); - getInfoFromAddress(); + try + { + getInfoFromAddress(); + } + catch(Exception e) + { + URISyntaxException ex = new URISyntaxException(str,"Error parsing address"); + ex.initCause(e); + throw ex; + } } _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax); } @@ -264,6 +273,11 @@ public abstract class AMQDestination implements Destination, Referenceable _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax); } + public DestSyntax getDestSyntax() + { + return _destSyntax; + } + public AMQShortString getEncodedName() { if(_urlAsShortString == null) @@ -629,15 +643,21 @@ public abstract class AMQDestination implements Destination, Referenceable } // ----- new address syntax ----------- + public static class Binding { String exchange; String bindingKey; + String queue; Map<String,Object> args; - public Binding(String exchange,String bindingKey,Map<String,Object> args) + public Binding(String exchange, + String queue, + String bindingKey, + Map<String,Object> args) { this.exchange = exchange; + this.queue = queue; this.bindingKey = bindingKey; this.args = args; } @@ -647,6 +667,11 @@ public abstract class AMQDestination implements Destination, Referenceable return exchange; } + public String getQueue() + { + return queue; + } + public String getBindingKey() { return bindingKey; @@ -662,7 +687,15 @@ public abstract class AMQDestination implements Destination, Referenceable return _address; } - public String getName() { + public int getAddressType(){ + return _addressType; + } + + public void setAddressType(int addressType){ + _addressType = addressType; + } + + public String getAddressName() { return _name; } @@ -670,6 +703,10 @@ public abstract class AMQDestination implements Destination, Referenceable return _subject; } + public void setSubject(String subject) { + _subject = subject; + } + public AddressOption getCreate() { return _create; } @@ -681,49 +718,35 @@ public abstract class AMQDestination implements Destination, Referenceable public AddressOption getDelete() { return _delete; } - - public String getFilter() { - return _filter; - } - - public FilterType getFilterType() { - return _filterType; - } - - public boolean isNoLocal() { - return _isNoLocal; - } - - public int getNodeType() { - return _nodeType; + + public Node getTargetNode() + { + return _targetNode; } - public QpidQueueOptions getQueueOptions() { - return _queueOptions; + public void setTargetNode(Node node) + { + _targetNode = node; } - public List<Binding> getBindings() { - return _bindings; + public Node getSourceNode() + { + return _sourceNode; } - public void addBinding(Binding binding) { - this._bindings.add(binding); - } - - public DestSyntax getDestSyntax() { - return _destSyntax; - } - - public QpidExchangeOptions getExchangeOptions() { - return _exchangeOptions; + public void setSourceNode(Node node) + { + _sourceNode = node; } - public String getAlternateExchange() { - return _alternateExchange; + public Link getSourceLink() + { + return _sourceLink; } - public void setAlternateExchange(String alternateExchange) { - this._alternateExchange = alternateExchange; + public void setSourceLink(Link link) + { + _sourceLink = link; } public void setExchangeName(AMQShortString name) @@ -750,37 +773,35 @@ public abstract class AMQDestination implements Destination, Referenceable return Address.parse(str); } - private void getInfoFromAddress() + private void getInfoFromAddress() throws Exception { _name = _address.getName(); _subject = _address.getSubject(); - AddressHelper addrHelper = new AddressHelper(_address); + _addrHelper = new AddressHelper(_address); - _create = addrHelper.getCreate() != null ? - AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER; + _create = _addrHelper.getCreate() != null ? + AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER; - _assert = addrHelper.getAssert() != null ? - AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS; + _assert = _addrHelper.getAssert() != null ? + AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER; - _delete = addrHelper.getDelete() != null ? - AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER; + _delete = _addrHelper.getDelete() != null ? + AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER; - _filter = addrHelper.getFilter(); - _isNoLocal = addrHelper.isNoLocal(); - _isDurable = addrHelper.isDurable(); - _isAutoDelete = addrHelper.isAutoDelete(); - _isExclusive = addrHelper.isExclusive(); - _browseOnly = addrHelper.isBrowseOnly(); - - _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")? - QUEUE_TYPE : TOPIC_TYPE; - - _alternateExchange = addrHelper.getAltExchange(); - - _queueOptions = addrHelper.getQpidQueueOptions(); - _exchangeOptions = addrHelper.getQpidExchangeOptions(); - _bindings = addrHelper.getBindings(); + _addressType = _addrHelper.getTargetNodeType(); + _targetNode = _addrHelper.getTargetNode(_addressType); + _sourceNode = _addrHelper.getSourceNode(_addressType); + _sourceLink = _addrHelper.getLink(); + } + + // This method is needed if we didn't know the node type at the beginning. + // Therefore we have to query the broker to figure out the type. + // Once the type is known we look for the necessary properties. + public void rebuildTargetAndSourceNodes(int addressType) + { + _targetNode = _addrHelper.getTargetNode(addressType); + _sourceNode = _addrHelper.getSourceNode(addressType); } // ----- / new address syntax ----------- diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 704dbf8bfc..8064ed1ae6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -23,8 +23,10 @@ import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -89,6 +91,9 @@ import java.util.Timer; import java.util.TimerTask; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.client.messaging.address.Node.ExchangeNode; +import org.apache.qpid.client.messaging.address.Node.QueueNode; /** * This is a 0.10 Session @@ -354,13 +359,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - for (Binding binding: destination.getBindings()) + List<Binding> bindings = new ArrayList<Binding>(); + bindings.addAll(destination.getSourceNode().getBindings()); + bindings.addAll(destination.getTargetNode().getBindings()); + for (Binding binding: bindings) { - _logger.debug("Binding queue : " + queueName.toString() + + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + _logger.debug("Binding queue : " + queue + " exchange: " + binding.getExchange() + " using binding key " + binding.getBindingKey() + " with args " + printMap(binding.getArgs())); - getQpidSession().exchangeBind(queueName.toString(), + getQpidSession().exchangeBind(queue, binding.getExchange(), binding.getBindingKey(), binding.getArgs()); @@ -687,21 +697,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic queueName = amqd.getAMQQueueName(); } - Map<String,Object> arguments = new HashMap<String,Object>(); - if (noLocal || amqd.isNoLocal()) - { - arguments.put("no-local", true); + if (amqd.getDestSyntax() == DestSyntax.BURL) + { + Map<String,Object> arguments = new HashMap<String,Object>(); + if (noLocal) + { + arguments.put("no-local", true); + } + + /*if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null) + { + arguments.putAll(amqd.getta); + }*/ + + getQpidSession().queueDeclare(queueName.toString(), "" , arguments, + amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + amqd.isDurable() ? Option.DURABLE : Option.NONE, + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - - if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null) + else { - arguments.putAll(amqd.getQueueOptions()); + QueueNode node = (QueueNode)amqd.getSourceNode(); + getQpidSession().queueDeclare(queueName.toString(), "" , + node.getDeclareArgs(), + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments, - amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, - amqd.isDurable() ? Option.DURABLE : Option.NONE, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false if (!nowait) { @@ -1017,39 +1040,58 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } - public boolean isExchangeExist(AMQDestination dest,boolean assertNode) + public boolean isExchangeExist(AMQDestination dest,ExchangeNode node,boolean assertNode) { boolean match = true; - ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get(); - match = !result.getNotFound(); + ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); + match = !result.getNotFound(); - if (match && assertNode) - { - match = (result.getDurable() == dest.isDurable()) && - (dest.getExchangeClass() != null) && - (dest.getExchangeClass().asString().equals(result.getType())) && - (matchProps(result.getArguments(),dest.getQueueOptions())); - } if (match) { - dest.setExchangeClass(new AMQShortString(result.getType())); + if (assertNode) + { + match = (result.getDurable() == node.isDurable()) && + (node.getExchangeType() != null && + node.getExchangeType().equals(result.getType())) && + (matchProps(result.getArguments(),node.getDeclareArgs())); + } + else if (node.getExchangeType() != null) + { + // even if assert is false, better to verify this + match = node.getExchangeType().equals(result.getType()); + if (!match) + { + _logger.debug("Exchange type doesn't match. Expected : " + node.getExchangeType() + + " actual " + result.getType()); + } + } + else + { + _logger.debug("Setting Exchange type " + result.getType()); + node.setExchangeType(result.getType()); + dest.setExchangeClass(new AMQShortString(result.getType())); + } } return match; } - public boolean isQueueExist(AMQDestination dest,boolean assertNode) + public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) { boolean match = true; - QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get(); - match = dest.getName().equals(result.getQueue()); + QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); + match = dest.getAddressName().equals(result.getQueue()); if (match && assertNode) { - match = (result.getDurable() == dest.isDurable()) && - (result.getAutoDelete() == dest.isAutoDelete()) && - (result.getExclusive() == dest.isExclusive()) && - (matchProps(result.getArguments(),dest.getQueueOptions())); + match = (result.getDurable() == node.isDurable()) && + (result.getAutoDelete() == node.isAutoDelete()) && + (result.getExclusive() == node.isExclusive()) && + (matchProps(result.getArguments(),node.getDeclareArgs())); + } + else if (match) + { + // should I use the queried details to update the local data structure. } return match; @@ -1063,76 +1105,177 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic match = target.containsKey(key) && target.get(key).equals(source.get(key)); - if (!match) return match; + if (!match) + { + StringBuffer buf = new StringBuffer(); + buf.append("Property given in address did not match with the args sent by the broker."); + buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); + buf.append(" Actual { ").append(key).append(" : ").append(target.get(key)).append(" }"); + _logger.debug(buf.toString()); + return match; + } } return match; } + @SuppressWarnings("deprecation") public void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, boolean noWait) throws AMQException { - boolean noLocal = dest.isNoLocal(); boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || (!isConsumer && dest.getAssert() == AddressOption.SENDER); + + boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || + (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || + (!isConsumer && dest.getCreate() == AddressOption.SENDER); - if (isExchangeExist(dest,assertNode)) + + int type = resolveAddressType(dest); + + switch (type) { - dest.setExchangeName(new AMQShortString(dest.getName())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - if (isConsumer) + case AMQDestination.QUEUE_TYPE: { - dest.setQueueName(null); - dest.addBinding(new Binding(dest.getName(), - dest.getSubject(), - null)); + if (isQueueExist(dest,(QueueNode)dest.getSourceNode(),assertNode)) + { + setLegacyFiledsForQueueType(dest); + break; + } + else if(createNode) + { + setLegacyFiledsForQueueType(dest); + send0_10QueueDeclare(dest,null,false,noWait); + break; + } } + + case AMQDestination.TOPIC_TYPE: + { + if (isExchangeExist(dest,(ExchangeNode)dest.getTargetNode(),assertNode)) + { + setLegacyFiledsForTopicType(dest); + verifySubject(dest); + createSubscriptionQueue(dest); + break; + } + else if(createNode) + { + setLegacyFiledsForTopicType(dest); + verifySubject(dest); + sendExchangeDeclare(dest.getAddressName(), + dest.getExchangeClass().asString(), + dest.getTargetNode().getAlternateExchange(), + dest.getTargetNode().getDeclareArgs(), + false); + createSubscriptionQueue(dest); + break; + } + } + + default: + throw new AMQException( + "The name '" + dest.getAddressName() + + "' supplied in the address doesn't resolve to an exchange or a queue"); } - else if (isQueueExist(dest,assertNode)) - { - dest.setQueueName(new AMQShortString(dest.getName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - else if (dest.getCreate() == AddressOption.ALWAYS || - dest.getCreate() == AddressOption.RECEIVER && isConsumer || - dest.getCreate() == AddressOption.SENDER && !isConsumer) + } + + int resolveAddressType(AMQDestination dest) throws AMQException + { + int type = dest.getAddressType(); + String name = dest.getAddressName(); + if (type != AMQDestination.UNKNOWN_TYPE) + { + return type; + } + else + { + ExchangeBoundResult result = getQpidSession().exchangeBound(name,name,null,null).get(); + if (result.getQueueNotFound() && result.getExchangeNotFound()) { + //neither a queue nor an exchange exists with that name; treat it as a queue + type = AMQDestination.QUEUE_TYPE; + } else if (result.getExchangeNotFound()) { + //name refers to a queue + type = AMQDestination.QUEUE_TYPE; + } else if (result.getQueueNotFound()) { + //name refers to an exchange + type = AMQDestination.TOPIC_TYPE; + } else { + //both a queue and exchange exist for that name + throw new AMQException("Ambiguous address, please specify queue or topic as node type"); + } + dest.rebuildTargetAndSourceNodes(type); + return type; + } + } + + @SuppressWarnings("deprecation") + private void verifySubject(AMQDestination dest) throws AMQException + { + if (dest.getSubject() == null || dest.getSubject().trim().equals("")) { - if (dest.getNodeType() == AMQDestination.QUEUE_TYPE) + if (dest.getExchangeClass() == ExchangeDefaults.TOPIC_EXCHANGE_CLASS) { - dest.setQueueName(new AMQShortString(dest.getName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); + dest.setRoutingKey(ExchangeDefaults.WILDCARD_ANY); + dest.setSubject(ExchangeDefaults.WILDCARD_ANY.toString()); } - else + else if (dest.getExchangeClass() == ExchangeDefaults.DIRECT_EXCHANGE_CLASS) { - dest.setQueueName(null); - dest.setExchangeName(new AMQShortString(dest.getName())); - dest.setExchangeClass(dest.getExchangeClass() == null? - ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass()); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - dest.addBinding(new Binding(dest.getName(), - dest.getSubject(), - null)); - - sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(), - dest.getAlternateExchange(), dest.getExchangeOptions(),false); - + throw new AMQException("If sending to an exchange of type direct," + + " a valid subject must be specified"); } - - send0_10QueueDeclare(dest,null,noLocal,noWait); } - else + } + + private void createSubscriptionQueue(AMQDestination dest) throws AMQException + { + if (dest.getSourceNode() != null) { - throw new AMQException( - "The name '" + dest.getName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); + QueueNode node = (QueueNode)dest.getSourceNode(); + if (dest.getQueueName() == null || !isQueueExist(dest,node,true)) + { + // can name : my-queue be used in x-declare? + // if so set it to dest queue name + // if (dest.getQueueName() == null) { dest.setName(node.getName()) } + send0_10QueueDeclare(dest,null,false,false); + } + node.addBinding(new Binding(dest.getAddressName(), + dest.getQueueName(),// should have one by now + dest.getSubject(), + node.getDeclareArgs())); } + else + { + send0_10QueueDeclare(dest,null,false,false); + dest.getTargetNode().addBinding(new Binding(dest.getAddressName(), + null, + dest.getSubject(), + null)); + } + } + + private void setLegacyFiledsForQueueType(AMQDestination dest) + { + // legacy support + dest.setQueueName(new AMQShortString(dest.getAddressName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + + private void setLegacyFiledsForTopicType(AMQDestination dest) + { + // legacy support + dest.setQueueName(null); + dest.setExchangeName(new AMQShortString(dest.getAddressName())); + ExchangeNode node = (ExchangeNode)dest.getTargetNode(); + dest.setExchangeClass(node.getExchangeType() == null? + ExchangeDefaults.TOPIC_EXCHANGE_CLASS: + new AMQShortString(node.getExchangeType())); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); } /** This should be moved to a suitable utility class */ diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 84bc4d596e..1d6dde7c29 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -21,10 +21,15 @@ package org.apache.qpid.client.messaging.address; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.messaging.address.Node.ExchangeNode; +import org.apache.qpid.client.messaging.address.Node.QueueNode; +import org.apache.qpid.client.messaging.address.Node.UnknownNodeType; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; import org.apache.qpid.messaging.Address; @@ -35,8 +40,11 @@ import org.apache.qpid.messaging.Address; */ public class AddressHelper { - public static final String NODE_PROPS = "node-properties"; - public static final String X_PROPS = "x-properties"; + public static final String NODE = "node"; + public static final String LINK = "link"; + public static final String X_DECLARE = "x-declare"; + public static final String X_BINDINGS = "x-bindings"; + public static final String X_SUBSCRIBE = "x-subscribes"; public static final String CREATE = "create"; public static final String ASSERT = "assert"; public static final String DELETE = "delete"; @@ -49,22 +57,34 @@ public class AddressHelper public static final String ALT_EXCHANGE = "alt-exchange"; public static final String BINDINGS = "bindings"; public static final String BROWSE_ONLY = "browse"; + public static final String CAPACITY = "capacity"; + public static final String NAME = "name"; + public static final String EXCHANGE = "exchange"; + public static final String QUEUE = "queue"; + public static final String KEY = "key"; + public static final String ARGUMENTS = "arguments"; private Address address; private Accessor addressProps; private Accessor nodeProps; - private Accessor xProps; + private Accessor linkProps; public AddressHelper(Address address) { this.address = address; addressProps = new MapAccessor(address.getOptions()); Map node_props = address.getOptions() == null || - address.getOptions().get(NODE_PROPS) == null ? - null : (Map)address.getOptions().get(NODE_PROPS); - nodeProps = new MapAccessor(node_props); - xProps = new MapAccessor(node_props == null || node_props.get(X_PROPS) == null? - null: (Map)node_props.get(X_PROPS)); + address.getOptions().get(NODE) == null ? + null : (Map)address.getOptions().get(NODE); + + if (node_props != null) { nodeProps = new MapAccessor(node_props); } + + + Map link_props = address.getOptions() == null || + address.getOptions().get(LINK) == null ? + null : (Map)address.getOptions().get(LINK); + + if (link_props != null) { linkProps = new MapAccessor(link_props); } } public String getCreate() @@ -82,135 +102,218 @@ public class AddressHelper return addressProps.getString(DELETE); } - public String getFilter() - { - return addressProps.getString(FILTER); - } - public boolean isNoLocal() { Boolean b = nodeProps.getBoolean(NO_LOCAL); return b == null ? false : b ; } - - public boolean isDurable() - { - Boolean b = nodeProps.getBoolean(DURABLE); - return b == null ? false : b ; - } - - public boolean isExclusive() - { - Boolean b = xProps.getBoolean(EXCLUSIVE); - return b == null ? false : b ; - } - - public boolean isAutoDelete() - { - Boolean b = xProps.getBoolean(AUTO_DELETE); - return b == null ? false : b ; - } - + public boolean isBrowseOnly() { - Boolean b = xProps.getBoolean(BROWSE_ONLY); + Boolean b = nodeProps.getBoolean(BROWSE_ONLY); return b == null ? false : b ; } - - public String getNodeType() - { - return nodeProps.getString(TYPE); - } - - public String getAltExchange() - { - return xProps.getString(ALT_EXCHANGE); - } - - public QpidQueueOptions getQpidQueueOptions() + + public QpidQueueOptions getQpidQueueOptions(MapAccessor args) { QpidQueueOptions options = new QpidQueueOptions(); - if (xProps.getInt(QpidQueueOptions.QPID_MAX_COUNT) != null) + if (args.getInt(QpidQueueOptions.QPID_MAX_COUNT) != null) { - options.setMaxCount(xProps.getInt(QpidQueueOptions.QPID_MAX_COUNT)); + options.setMaxCount(args.getInt(QpidQueueOptions.QPID_MAX_COUNT)); } - if (xProps.getInt(QpidQueueOptions.QPID_MAX_SIZE) != null) + if (args.getInt(QpidQueueOptions.QPID_MAX_SIZE) != null) { - options.setMaxSize(xProps.getInt(QpidQueueOptions.QPID_MAX_SIZE)); + options.setMaxSize(args.getInt(QpidQueueOptions.QPID_MAX_SIZE)); } - if (xProps.getInt(QpidQueueOptions.QPID_POLICY_TYPE) != null) + if (args.getInt(QpidQueueOptions.QPID_POLICY_TYPE) != null) { - options.setPolicyType(xProps.getString(QpidQueueOptions.QPID_POLICY_TYPE)); + options.setPolicyType(args.getString(QpidQueueOptions.QPID_POLICY_TYPE)); } - if (xProps.getInt(QpidQueueOptions.QPID_PERSIST_LAST_NODE) != null) + if (args.getInt(QpidQueueOptions.QPID_PERSIST_LAST_NODE) != null) { options.setPersistLastNode(); } - if (xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null) + if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null) { - options.setOrderingPolicy(xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE)); - options.setLvqKey(xProps.getString(QpidQueueOptions.QPID_LVQ_KEY)); + options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE)); + options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY)); } - else if (xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null) + else if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null) { - options.setOrderingPolicy(xProps.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE)); - options.setLvqKey(xProps.getString(QpidQueueOptions.QPID_LVQ_KEY)); + options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE)); + options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY)); } - if (xProps.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null) + if (args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null) { - options.setQueueEvents(xProps.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION)); + options.setQueueEvents(args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION)); } return options; } - public QpidExchangeOptions getQpidExchangeOptions() + public QpidExchangeOptions getQpidExchangeOptions(MapAccessor args) { QpidExchangeOptions options = new QpidExchangeOptions(); - if (xProps.getInt(QpidExchangeOptions.QPID_EXCLUSIVE_BINDING) != null) + if (args.getInt(QpidExchangeOptions.QPID_EXCLUSIVE_BINDING) != null) { options.setExclusiveBinding(); } - if (xProps.getInt(QpidExchangeOptions.QPID_INITIAL_VALUE_EXCHANGE) != null) + if (args.getInt(QpidExchangeOptions.QPID_INITIAL_VALUE_EXCHANGE) != null) { options.setInitialValueExchange(); } - if (xProps.getInt(QpidExchangeOptions.QPID_MSG_SEQUENCE) != null) + if (args.getInt(QpidExchangeOptions.QPID_MSG_SEQUENCE) != null) { options.setMessageSequencing(); } return options; } - public List<Binding> getBindings() + @SuppressWarnings("unchecked") + public List<Binding> getBindings(Map props) { - List<Binding> bindings = new ArrayList<Binding>(); - if (address.getOptions() != null && - address.getOptions().get(NODE_PROPS) != null && - ((Map)address.getOptions().get(NODE_PROPS)).get(X_PROPS) != null) + List<Binding> bindings = new ArrayList<Binding>(); + List<Map> bindingList = (List<Map>)props.get(X_BINDINGS); + if (bindingList != null) { - Map node_props = (Map)address.getOptions().get(NODE_PROPS); - List<String> bindingList = - (List<String>)((Map)node_props.get(X_PROPS)).get(BINDINGS); - if (bindingList != null) + for (Map bindingMap: bindingList) { - for (String bindingStr: bindingList) - { - Address addr = Address.parse(bindingStr); - Binding binding = new Binding(addr.getName(), - addr.getSubject(), - addr.getOptions()); - bindings.add(binding); - } + Binding binding = new Binding((String)bindingMap.get(EXCHANGE), + (String)bindingMap.get(QUEUE), + (String)bindingMap.get(KEY), + bindingMap.get(ARGUMENTS) == null ? + Collections.EMPTY_MAP: + (Map<String,Object>)bindingMap.get(ARGUMENTS)); + bindings.add(binding); } } return bindings; } + + public Map getDeclareArgs(Map props) + { + if (props != null) + { + return (Map)props.get(X_DECLARE); + } + else + { + return Collections.EMPTY_MAP; + } + } + + public int getTargetNodeType() throws Exception + { + if (nodeProps == null || nodeProps.getString(TYPE) == null) + { + // need to query and figure out + return AMQDestination.UNKNOWN_TYPE; + } + else if (nodeProps.getString(TYPE).equals("queue")) + { + return AMQDestination.QUEUE_TYPE; + } + else if ((nodeProps.getString(TYPE).equals("topic") || + nodeProps.getString(TYPE).equals("direct") || + nodeProps.getString(TYPE).equals("fanout") || + nodeProps.getString(TYPE).equals("match") || + nodeProps.getString(TYPE).equals("xml")) ) + { + return AMQDestination.TOPIC_TYPE; + } + else + { + throw new Exception("unkown exchange type"); + } + } + + public Node getTargetNode(int addressType) + { + // target node here is the default exchange + if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) + { + return new ExchangeNode(); + } + else if (addressType == AMQDestination.TOPIC_TYPE) + { + Map node = (Map)address.getOptions().get(NODE); + return createExchangeNode(node); + } + else + { + // don't know yet + return null; + } + } + + private Node createExchangeNode(Map parent) + { + Map declareArgs = getDeclareArgs(parent); + MapAccessor argsMap = new MapAccessor(declareArgs); + ExchangeNode node = new ExchangeNode(); + node.setExchangeType(nodeProps.getString(TYPE)); + node.setDeclareArgs(getQpidExchangeOptions(argsMap)); + fillInCommonNodeArgs(node,parent,argsMap); + return node; + } + + private Node createQueueNode(Map parent) + { + Map declareArgs = getDeclareArgs(parent); + MapAccessor argsMap = new MapAccessor(declareArgs); + QueueNode node = new QueueNode(); + node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); + node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null? false : argsMap.getBoolean(EXCLUSIVE)); + node.setDeclareArgs(getQpidQueueOptions(argsMap)); + fillInCommonNodeArgs(node,parent,argsMap); + + return node; + } + + private void fillInCommonNodeArgs(Node node,Map parent,MapAccessor argsMap) + { + node.setDurable(nodeProps.getBoolean(DURABLE) == null? false : nodeProps.getBoolean(DURABLE)); + node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null? false : argsMap.getBoolean(AUTO_DELETE)); + node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); + node.setBindings(getBindings(parent)); + } + + public Node getSourceNode(int addressType) + { + if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) + { + return createQueueNode((Map)address.getOptions().get(NODE)); + } + if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) + { + return createQueueNode((Map)address.getOptions().get(LINK)); + } + else + { + // need to query the info + return new QueueNode(); + } + } + + public Link getLink() + { + Link link = new Link(); + if (linkProps != null) + { + link.setDurable(linkProps.getBoolean(DURABLE)); + link.setName(linkProps.getString(NAME)); + link.setCapacity(linkProps.getInt(CAPACITY)); + link.setFilter(linkProps.getString(FILTER)); + // so far filter type not used + } + + return link; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java new file mode 100644 index 0000000000..367191e74e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import org.apache.qpid.client.messaging.address.Node.QueueNode; + +public class Link +{ + public enum FilterType { SQL92, XQUERY, SUBJECT } + + protected String name; + protected String _filter; + protected FilterType _filterType = FilterType.SUBJECT; + protected boolean _isNoLocal; + protected boolean _isDurable; + protected int _capacity = 0; + protected Node node; + + public Node getNode() + { + return node; + } + + public void setNode(Node node) + { + this.node = node; + } + + public boolean isDurable() + { + return _isDurable; + } + + public void setDurable(boolean durable) + { + _isDurable = durable; + } + + public String getFilter() + { + return _filter; + } + + public void setFilter(String filter) + { + this._filter = filter; + } + + public FilterType getFilterType() + { + return _filterType; + } + + public void setFilterType(FilterType type) + { + _filterType = type; + } + + public boolean isNoLocal() + { + return _isNoLocal; + } + + public void setNoLocal(boolean noLocal) + { + _isNoLocal = noLocal; + } + + public int getCapacity() + { + return _capacity; + } + + public void setCapacity(int capacity) + { + this._capacity = capacity; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java new file mode 100644 index 0000000000..24686cab17 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.messaging.address; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.naming.OperationNotSupportedException; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.Binding; + +public abstract class Node +{ + protected int _nodeType = AMQDestination.UNKNOWN_TYPE; + protected boolean _isDurable; + protected boolean _isAutoDelete; + protected String _alternateExchange; + protected List<Binding> _bindings = new ArrayList<Binding>(); + + public int getType() + { + return _nodeType; + } + + public boolean isDurable() + { + return _isDurable; + } + + public void setDurable(boolean durable) + { + _isDurable = durable; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + + public List<Binding> getBindings() + { + return _bindings; + } + + public void setBindings(List<Binding> bindings) + { + _bindings = bindings; + } + + public void addBinding(Binding binding) { + this._bindings.add(binding); + } + + public abstract Map<String,Object> getDeclareArgs(); + + public static class QueueNode extends Node + { + protected boolean _isExclusive; + protected QpidQueueOptions _queueOptions = new QpidQueueOptions(); + + public QueueNode() + { + _nodeType = AMQDestination.QUEUE_TYPE; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public Map<String,Object> getDeclareArgs() + { + return _queueOptions; + } + + public void setDeclareArgs(QpidQueueOptions options) + { + _queueOptions = options; + } + } + + public static class ExchangeNode extends Node + { + protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); + protected String _exchangeType; + + public ExchangeNode() + { + _nodeType = AMQDestination.TOPIC_TYPE; + } + + public String getExchangeType() + { + return _exchangeType; + } + + public void setExchangeType(String exchangeType) + { + _exchangeType = exchangeType; + } + + public Map<String,Object> getDeclareArgs() + { + return _exchangeOptions; + } + + public void setDeclareArgs(QpidExchangeOptions options) + { + _exchangeOptions = options; + } + } + + public static class UnknownNodeType extends Node + { + public Map<String,Object> getDeclareArgs() + { + return Collections.emptyMap(); + } + } +} |
