From 7836498914eab08ec0eb7b10824deaa4d78db976 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 15 Apr 2010 19:39:06 +0000 Subject: This is related to QPID-2496 The changes include support the new addressing structure and most items on the list specified in the JIRA. The following is not included in the commit 1. Add subject as filter in JMS - for exchanges use it as binding key and for queues use it as a selector - this needs to be thought through. Besides JMS already provides a way to handle this. 2. Implementation of DELETE option. Further testing needs to be done to figure out the impact. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@934563 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQDestination.java | 165 +++++++----- .../org/apache/qpid/client/AMQSession_0_10.java | 295 +++++++++++++++------ .../client/messaging/address/AddressHelper.java | 265 ++++++++++++------ .../apache/qpid/client/messaging/address/Link.java | 106 ++++++++ .../apache/qpid/client/messaging/address/Node.java | 162 +++++++++++ 5 files changed, 764 insertions(+), 229 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 1ed64e7890..2d7844944a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -32,6 +32,8 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.messaging.address.QpidExchangeOptions; import org.apache.qpid.client.messaging.address.QpidQueueOptions; import org.apache.qpid.configuration.ClientProperties; @@ -119,27 +121,25 @@ public abstract class AMQDestination implements Destination, Referenceable } } - public enum FilterType { SQL92, XQUERY, SUBJECT } + protected static DestSyntax defaultDestSyntax; protected DestSyntax _destSyntax; + protected AddressHelper _addrHelper; protected Address _address; + protected int _addressType = AMQDestination.UNKNOWN_TYPE; protected String _name; protected String _subject; protected AddressOption _create = AddressOption.NEVER; - protected AddressOption _assert = AddressOption.ALWAYS; - protected AddressOption _delete = AddressOption.NEVER; - - protected String _filter; - protected FilterType _filterType = FilterType.SUBJECT; - protected boolean _isNoLocal; - protected int _nodeType = QUEUE_TYPE; - protected String _alternateExchange; - protected QpidQueueOptions _queueOptions; - protected QpidExchangeOptions _exchangeOptions; - protected List _bindings = new ArrayList(); + protected AddressOption _assert = AddressOption.NEVER; + protected AddressOption _delete = AddressOption.NEVER; + + protected Node _targetNode; + protected Node _sourceNode; + protected Link _targetLink; + protected Link _sourceLink; // ----- / Fields required to support new address syntax ------- static @@ -149,7 +149,7 @@ public abstract class AMQDestination implements Destination, Referenceable DestSyntax.BURL.toString())); } - protected AMQDestination(Address address) + protected AMQDestination(Address address) throws Exception { this._address = address; getInfoFromAddress(); @@ -169,7 +169,16 @@ public abstract class AMQDestination implements Destination, Referenceable { _destSyntax = DestSyntax.ADDR; this._address = createAddressFromString(str); - getInfoFromAddress(); + try + { + getInfoFromAddress(); + } + catch(Exception e) + { + URISyntaxException ex = new URISyntaxException(str,"Error parsing address"); + ex.initCause(e); + throw ex; + } } _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax); } @@ -264,6 +273,11 @@ public abstract class AMQDestination implements Destination, Referenceable _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax); } + public DestSyntax getDestSyntax() + { + return _destSyntax; + } + public AMQShortString getEncodedName() { if(_urlAsShortString == null) @@ -629,15 +643,21 @@ public abstract class AMQDestination implements Destination, Referenceable } // ----- new address syntax ----------- + public static class Binding { String exchange; String bindingKey; + String queue; Map args; - public Binding(String exchange,String bindingKey,Map args) + public Binding(String exchange, + String queue, + String bindingKey, + Map args) { this.exchange = exchange; + this.queue = queue; this.bindingKey = bindingKey; this.args = args; } @@ -647,6 +667,11 @@ public abstract class AMQDestination implements Destination, Referenceable return exchange; } + public String getQueue() + { + return queue; + } + public String getBindingKey() { return bindingKey; @@ -662,7 +687,15 @@ public abstract class AMQDestination implements Destination, Referenceable return _address; } - public String getName() { + public int getAddressType(){ + return _addressType; + } + + public void setAddressType(int addressType){ + _addressType = addressType; + } + + public String getAddressName() { return _name; } @@ -670,6 +703,10 @@ public abstract class AMQDestination implements Destination, Referenceable return _subject; } + public void setSubject(String subject) { + _subject = subject; + } + public AddressOption getCreate() { return _create; } @@ -681,49 +718,35 @@ public abstract class AMQDestination implements Destination, Referenceable public AddressOption getDelete() { return _delete; } - - public String getFilter() { - return _filter; - } - - public FilterType getFilterType() { - return _filterType; - } - - public boolean isNoLocal() { - return _isNoLocal; - } - - public int getNodeType() { - return _nodeType; + + public Node getTargetNode() + { + return _targetNode; } - public QpidQueueOptions getQueueOptions() { - return _queueOptions; + public void setTargetNode(Node node) + { + _targetNode = node; } - public List getBindings() { - return _bindings; + public Node getSourceNode() + { + return _sourceNode; } - public void addBinding(Binding binding) { - this._bindings.add(binding); - } - - public DestSyntax getDestSyntax() { - return _destSyntax; - } - - public QpidExchangeOptions getExchangeOptions() { - return _exchangeOptions; + public void setSourceNode(Node node) + { + _sourceNode = node; } - public String getAlternateExchange() { - return _alternateExchange; + public Link getSourceLink() + { + return _sourceLink; } - public void setAlternateExchange(String alternateExchange) { - this._alternateExchange = alternateExchange; + public void setSourceLink(Link link) + { + _sourceLink = link; } public void setExchangeName(AMQShortString name) @@ -750,37 +773,35 @@ public abstract class AMQDestination implements Destination, Referenceable return Address.parse(str); } - private void getInfoFromAddress() + private void getInfoFromAddress() throws Exception { _name = _address.getName(); _subject = _address.getSubject(); - AddressHelper addrHelper = new AddressHelper(_address); + _addrHelper = new AddressHelper(_address); - _create = addrHelper.getCreate() != null ? - AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER; + _create = _addrHelper.getCreate() != null ? + AddressOption.getOption(_addrHelper.getCreate()):AddressOption.NEVER; - _assert = addrHelper.getAssert() != null ? - AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS; + _assert = _addrHelper.getAssert() != null ? + AddressOption.getOption(_addrHelper.getAssert()):AddressOption.NEVER; - _delete = addrHelper.getDelete() != null ? - AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER; + _delete = _addrHelper.getDelete() != null ? + AddressOption.getOption(_addrHelper.getDelete()):AddressOption.NEVER; - _filter = addrHelper.getFilter(); - _isNoLocal = addrHelper.isNoLocal(); - _isDurable = addrHelper.isDurable(); - _isAutoDelete = addrHelper.isAutoDelete(); - _isExclusive = addrHelper.isExclusive(); - _browseOnly = addrHelper.isBrowseOnly(); - - _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")? - QUEUE_TYPE : TOPIC_TYPE; - - _alternateExchange = addrHelper.getAltExchange(); - - _queueOptions = addrHelper.getQpidQueueOptions(); - _exchangeOptions = addrHelper.getQpidExchangeOptions(); - _bindings = addrHelper.getBindings(); + _addressType = _addrHelper.getTargetNodeType(); + _targetNode = _addrHelper.getTargetNode(_addressType); + _sourceNode = _addrHelper.getSourceNode(_addressType); + _sourceLink = _addrHelper.getLink(); + } + + // This method is needed if we didn't know the node type at the beginning. + // Therefore we have to query the broker to figure out the type. + // Once the type is known we look for the necessary properties. + public void rebuildTargetAndSourceNodes(int addressType) + { + _targetNode = _addrHelper.getTargetNode(addressType); + _sourceNode = _addrHelper.getSourceNode(addressType); } // ----- / new address syntax ----------- diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 704dbf8bfc..8064ed1ae6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -23,8 +23,10 @@ import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -89,6 +91,9 @@ import java.util.Timer; import java.util.TimerTask; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.client.messaging.address.Node.ExchangeNode; +import org.apache.qpid.client.messaging.address.Node.QueueNode; /** * This is a 0.10 Session @@ -354,13 +359,18 @@ public class AMQSession_0_10 extends AMQSession bindings = new ArrayList(); + bindings.addAll(destination.getSourceNode().getBindings()); + bindings.addAll(destination.getTargetNode().getBindings()); + for (Binding binding: bindings) { - _logger.debug("Binding queue : " + queueName.toString() + + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + _logger.debug("Binding queue : " + queue + " exchange: " + binding.getExchange() + " using binding key " + binding.getBindingKey() + " with args " + printMap(binding.getArgs())); - getQpidSession().exchangeBind(queueName.toString(), + getQpidSession().exchangeBind(queue, binding.getExchange(), binding.getBindingKey(), binding.getArgs()); @@ -687,21 +697,34 @@ public class AMQSession_0_10 extends AMQSession arguments = new HashMap(); - if (noLocal || amqd.isNoLocal()) - { - arguments.put("no-local", true); + if (amqd.getDestSyntax() == DestSyntax.BURL) + { + Map arguments = new HashMap(); + if (noLocal) + { + arguments.put("no-local", true); + } + + /*if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null) + { + arguments.putAll(amqd.getta); + }*/ + + getQpidSession().queueDeclare(queueName.toString(), "" , arguments, + amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + amqd.isDurable() ? Option.DURABLE : Option.NONE, + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - - if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null) + else { - arguments.putAll(amqd.getQueueOptions()); + QueueNode node = (QueueNode)amqd.getSourceNode(); + getQpidSession().queueDeclare(queueName.toString(), "" , + node.getDeclareArgs(), + node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + node.isDurable() ? Option.DURABLE : Option.NONE, + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } - getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments, - amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, - amqd.isDurable() ? Option.DURABLE : Option.NONE, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false if (!nowait) { @@ -1017,39 +1040,58 @@ public class AMQSession_0_10 extends AMQSession getBindings() + @SuppressWarnings("unchecked") + public List getBindings(Map props) { - List bindings = new ArrayList(); - if (address.getOptions() != null && - address.getOptions().get(NODE_PROPS) != null && - ((Map)address.getOptions().get(NODE_PROPS)).get(X_PROPS) != null) + List bindings = new ArrayList(); + List bindingList = (List)props.get(X_BINDINGS); + if (bindingList != null) { - Map node_props = (Map)address.getOptions().get(NODE_PROPS); - List bindingList = - (List)((Map)node_props.get(X_PROPS)).get(BINDINGS); - if (bindingList != null) + for (Map bindingMap: bindingList) { - for (String bindingStr: bindingList) - { - Address addr = Address.parse(bindingStr); - Binding binding = new Binding(addr.getName(), - addr.getSubject(), - addr.getOptions()); - bindings.add(binding); - } + Binding binding = new Binding((String)bindingMap.get(EXCHANGE), + (String)bindingMap.get(QUEUE), + (String)bindingMap.get(KEY), + bindingMap.get(ARGUMENTS) == null ? + Collections.EMPTY_MAP: + (Map)bindingMap.get(ARGUMENTS)); + bindings.add(binding); } } return bindings; } + + public Map getDeclareArgs(Map props) + { + if (props != null) + { + return (Map)props.get(X_DECLARE); + } + else + { + return Collections.EMPTY_MAP; + } + } + + public int getTargetNodeType() throws Exception + { + if (nodeProps == null || nodeProps.getString(TYPE) == null) + { + // need to query and figure out + return AMQDestination.UNKNOWN_TYPE; + } + else if (nodeProps.getString(TYPE).equals("queue")) + { + return AMQDestination.QUEUE_TYPE; + } + else if ((nodeProps.getString(TYPE).equals("topic") || + nodeProps.getString(TYPE).equals("direct") || + nodeProps.getString(TYPE).equals("fanout") || + nodeProps.getString(TYPE).equals("match") || + nodeProps.getString(TYPE).equals("xml")) ) + { + return AMQDestination.TOPIC_TYPE; + } + else + { + throw new Exception("unkown exchange type"); + } + } + + public Node getTargetNode(int addressType) + { + // target node here is the default exchange + if (nodeProps == null || addressType == AMQDestination.QUEUE_TYPE) + { + return new ExchangeNode(); + } + else if (addressType == AMQDestination.TOPIC_TYPE) + { + Map node = (Map)address.getOptions().get(NODE); + return createExchangeNode(node); + } + else + { + // don't know yet + return null; + } + } + + private Node createExchangeNode(Map parent) + { + Map declareArgs = getDeclareArgs(parent); + MapAccessor argsMap = new MapAccessor(declareArgs); + ExchangeNode node = new ExchangeNode(); + node.setExchangeType(nodeProps.getString(TYPE)); + node.setDeclareArgs(getQpidExchangeOptions(argsMap)); + fillInCommonNodeArgs(node,parent,argsMap); + return node; + } + + private Node createQueueNode(Map parent) + { + Map declareArgs = getDeclareArgs(parent); + MapAccessor argsMap = new MapAccessor(declareArgs); + QueueNode node = new QueueNode(); + node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); + node.setExclusive(argsMap.getBoolean(EXCLUSIVE) == null? false : argsMap.getBoolean(EXCLUSIVE)); + node.setDeclareArgs(getQpidQueueOptions(argsMap)); + fillInCommonNodeArgs(node,parent,argsMap); + + return node; + } + + private void fillInCommonNodeArgs(Node node,Map parent,MapAccessor argsMap) + { + node.setDurable(nodeProps.getBoolean(DURABLE) == null? false : nodeProps.getBoolean(DURABLE)); + node.setAutoDelete(argsMap.getBoolean(AUTO_DELETE) == null? false : argsMap.getBoolean(AUTO_DELETE)); + node.setAlternateExchange(argsMap.getString(ALT_EXCHANGE)); + node.setBindings(getBindings(parent)); + } + + public Node getSourceNode(int addressType) + { + if (addressType == AMQDestination.QUEUE_TYPE && nodeProps != null) + { + return createQueueNode((Map)address.getOptions().get(NODE)); + } + if (addressType == AMQDestination.TOPIC_TYPE && linkProps != null) + { + return createQueueNode((Map)address.getOptions().get(LINK)); + } + else + { + // need to query the info + return new QueueNode(); + } + } + + public Link getLink() + { + Link link = new Link(); + if (linkProps != null) + { + link.setDurable(linkProps.getBoolean(DURABLE)); + link.setName(linkProps.getString(NAME)); + link.setCapacity(linkProps.getInt(CAPACITY)); + link.setFilter(linkProps.getString(FILTER)); + // so far filter type not used + } + + return link; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java new file mode 100644 index 0000000000..367191e74e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.messaging.address; + +import org.apache.qpid.client.messaging.address.Node.QueueNode; + +public class Link +{ + public enum FilterType { SQL92, XQUERY, SUBJECT } + + protected String name; + protected String _filter; + protected FilterType _filterType = FilterType.SUBJECT; + protected boolean _isNoLocal; + protected boolean _isDurable; + protected int _capacity = 0; + protected Node node; + + public Node getNode() + { + return node; + } + + public void setNode(Node node) + { + this.node = node; + } + + public boolean isDurable() + { + return _isDurable; + } + + public void setDurable(boolean durable) + { + _isDurable = durable; + } + + public String getFilter() + { + return _filter; + } + + public void setFilter(String filter) + { + this._filter = filter; + } + + public FilterType getFilterType() + { + return _filterType; + } + + public void setFilterType(FilterType type) + { + _filterType = type; + } + + public boolean isNoLocal() + { + return _isNoLocal; + } + + public void setNoLocal(boolean noLocal) + { + _isNoLocal = noLocal; + } + + public int getCapacity() + { + return _capacity; + } + + public void setCapacity(int capacity) + { + this._capacity = capacity; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java new file mode 100644 index 0000000000..24686cab17 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.messaging.address; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.naming.OperationNotSupportedException; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQDestination.Binding; + +public abstract class Node +{ + protected int _nodeType = AMQDestination.UNKNOWN_TYPE; + protected boolean _isDurable; + protected boolean _isAutoDelete; + protected String _alternateExchange; + protected List _bindings = new ArrayList(); + + public int getType() + { + return _nodeType; + } + + public boolean isDurable() + { + return _isDurable; + } + + public void setDurable(boolean durable) + { + _isDurable = durable; + } + + public boolean isAutoDelete() + { + return _isAutoDelete; + } + + public void setAutoDelete(boolean autoDelete) + { + _isAutoDelete = autoDelete; + } + + public String getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(String altExchange) + { + _alternateExchange = altExchange; + } + + public List getBindings() + { + return _bindings; + } + + public void setBindings(List bindings) + { + _bindings = bindings; + } + + public void addBinding(Binding binding) { + this._bindings.add(binding); + } + + public abstract Map getDeclareArgs(); + + public static class QueueNode extends Node + { + protected boolean _isExclusive; + protected QpidQueueOptions _queueOptions = new QpidQueueOptions(); + + public QueueNode() + { + _nodeType = AMQDestination.QUEUE_TYPE; + } + + public boolean isExclusive() + { + return _isExclusive; + } + + public void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + public Map getDeclareArgs() + { + return _queueOptions; + } + + public void setDeclareArgs(QpidQueueOptions options) + { + _queueOptions = options; + } + } + + public static class ExchangeNode extends Node + { + protected QpidExchangeOptions _exchangeOptions = new QpidExchangeOptions(); + protected String _exchangeType; + + public ExchangeNode() + { + _nodeType = AMQDestination.TOPIC_TYPE; + } + + public String getExchangeType() + { + return _exchangeType; + } + + public void setExchangeType(String exchangeType) + { + _exchangeType = exchangeType; + } + + public Map getDeclareArgs() + { + return _exchangeOptions; + } + + public void setDeclareArgs(QpidExchangeOptions options) + { + _exchangeOptions = options; + } + } + + public static class UnknownNodeType extends Node + { + public Map getDeclareArgs() + { + return Collections.emptyMap(); + } + } +} -- cgit v1.2.1