From b3ef5ba7d29d82d7371553c56e77b9e38f986e57 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 3 Feb 2010 17:31:04 +0000 Subject: This is related to QPID-1831 I added the patch attached to the above JIRA with modifications. The modifications include integration with the address parser added by Rafi, and several refactoring and bug fixes to the original patch. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906142 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQAnyDestination.java | 7 + .../java/org/apache/qpid/client/AMQConnection.java | 2 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 2 +- .../org/apache/qpid/client/AMQDestination.java | 298 +++++++++++++++++++- .../java/org/apache/qpid/client/AMQSession.java | 32 ++- .../org/apache/qpid/client/AMQSession_0_10.java | 302 ++++++++++++++++++--- .../org/apache/qpid/client/AMQSession_0_8.java | 7 + .../qpid/client/BasicMessageProducer_0_10.java | 65 +++-- .../qpid/test/unit/message/TestAMQSession.java | 8 + 9 files changed, 632 insertions(+), 91 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 4bb2c12cc8..a5c6f5f967 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.BindingURL; @@ -38,6 +40,11 @@ public class AMQAnyDestination extends AMQDestination { super(binding); } + + public AMQAnyDestination(String str) throws URISyntaxException + { + super(str); + } public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass, AMQShortString routingKey,boolean isExclusive, diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d6f91daae0..653e049002 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -54,7 +54,6 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import org.apache.configuration.ClientProperties; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; @@ -63,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 0d1a89a6c0..38e5b4fee0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -29,10 +29,10 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; -import org.apache.configuration.ClientProperties; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 311ef1f486..1ed64e7890 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -21,6 +21,9 @@ package org.apache.qpid.client; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import javax.jms.Destination; import javax.naming.NamingException; @@ -28,26 +31,35 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; +import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.QpidExchangeOptions; +import org.apache.qpid.client.messaging.address.QpidQueueOptions; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AMQDestination implements Destination, Referenceable { - protected final AMQShortString _exchangeName; + private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class); + + protected AMQShortString _exchangeName; - protected final AMQShortString _exchangeClass; + protected AMQShortString _exchangeClass; - protected final boolean _isDurable; + protected boolean _isDurable; - protected final boolean _isExclusive; + protected boolean _isExclusive; - protected final boolean _isAutoDelete; + protected boolean _isAutoDelete; - private final boolean _browseOnly; + private boolean _browseOnly; private AMQShortString _queueName; @@ -70,13 +82,107 @@ public abstract class AMQDestination implements Destination, Referenceable public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; - - protected AMQDestination(String url) throws URISyntaxException + + // ----- Fields required to support new address syntax ------- + + public enum DestSyntax { + BURL,ADDR; + + public static DestSyntax getSyntaxType(String s) + { + if (("BURL").equals(s)) + { + return BURL; + } + else if (("ADDR").equals(s)) + { + return ADDR; + } + else + { + throw new IllegalArgumentException("Invalid Destination Syntax Type" + + " should be one of {BURL|ADDR}"); + } + } + } + + public enum AddressOption { + ALWAYS, NEVER, SENDER, RECEIVER; + + public static AddressOption getOption(String str) + { + if ("always".equals(str)) return ALWAYS; + else if ("never".equals(str)) return NEVER; + else if ("sender".equals(str)) return SENDER; + else if ("receiver".equals(str)) return RECEIVER; + else throw new IllegalArgumentException(str + " is not an allowed value"); + } + } + + public enum FilterType { SQL92, XQUERY, SUBJECT } + + protected static DestSyntax defaultDestSyntax; + + protected DestSyntax _destSyntax; + + protected Address _address; + protected String _name; + protected String _subject; + protected AddressOption _create = AddressOption.NEVER; + protected AddressOption _assert = AddressOption.ALWAYS; + protected AddressOption _delete = AddressOption.NEVER; + + protected String _filter; + protected FilterType _filterType = FilterType.SUBJECT; + protected boolean _isNoLocal; + protected int _nodeType = QUEUE_TYPE; + protected String _alternateExchange; + protected QpidQueueOptions _queueOptions; + protected QpidExchangeOptions _exchangeOptions; + protected List _bindings = new ArrayList(); + // ----- / Fields required to support new address syntax ------- + + static + { + defaultDestSyntax = DestSyntax.getSyntaxType( + System.getProperty(ClientProperties.DEST_SYNTAX, + DestSyntax.BURL.toString())); + } + + protected AMQDestination(Address address) + { + this._address = address; + getInfoFromAddress(); + _destSyntax = DestSyntax.ADDR; + _logger.info("Based on " + address + " the selected destination syntax is " + _destSyntax); + } + + protected AMQDestination(String str) throws URISyntaxException + { + if (str.startsWith("BURL:") || + (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL)) + { + _destSyntax = DestSyntax.BURL; + getInfoFromBindingURL(new AMQBindingURL(str)); + } + else + { + _destSyntax = DestSyntax.ADDR; + this._address = createAddressFromString(str); + getInfoFromAddress(); + } + _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax); + } + + //retained for legacy support + protected AMQDestination(BindingURL binding) { - this(new AMQBindingURL(url)); + getInfoFromBindingURL(binding); + _destSyntax = DestSyntax.BURL; + _logger.info("Based on " + binding + " the selected destination syntax is " + _destSyntax); } - protected AMQDestination(BindingURL binding) + protected void getInfoFromBindingURL(BindingURL binding) { _exchangeName = binding.getExchangeName(); _exchangeClass = binding.getExchangeClass(); @@ -153,7 +259,9 @@ public abstract class AMQDestination implements Destination, Referenceable _queueName = queueName; _isDurable = isDurable; _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; + _destSyntax = DestSyntax.BURL; _browseOnly = browseOnly; + _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax); } public AMQShortString getEncodedName() @@ -243,7 +351,14 @@ public abstract class AMQDestination implements Destination, Referenceable public String toString() { - return toURL(); + if (_destSyntax == DestSyntax.BURL) + { + return toURL(); + } + else + { + return _address.toString(); + } } @@ -424,8 +539,8 @@ public abstract class AMQDestination implements Destination, Referenceable public int hashCode() { int result; - result = _exchangeName.hashCode(); - result = 29 * result + _exchangeClass.hashCode(); + result = _exchangeName == null ? "".hashCode() : _exchangeName.hashCode(); + result = 29 * result + (_exchangeClass == null ? "".hashCode() :_exchangeClass.hashCode()); //result = 29 * result + _destinationName.hashCode(); if (_queueName != null) { @@ -513,6 +628,163 @@ public abstract class AMQDestination implements Destination, Referenceable } } + // ----- new address syntax ----------- + public static class Binding + { + String exchange; + String bindingKey; + Map args; + + public Binding(String exchange,String bindingKey,Map args) + { + this.exchange = exchange; + this.bindingKey = bindingKey; + this.args = args; + } + + public String getExchange() + { + return exchange; + } + + public String getBindingKey() + { + return bindingKey; + } + + public Map getArgs() + { + return args; + } + } + + public Address getAddress() { + return _address; + } + + public String getName() { + return _name; + } + + public String getSubject() { + return _subject; + } + + public AddressOption getCreate() { + return _create; + } + + public AddressOption getAssert() { + return _assert; + } + + public AddressOption getDelete() { + return _delete; + } + + public String getFilter() { + return _filter; + } + + public FilterType getFilterType() { + return _filterType; + } + + public boolean isNoLocal() { + return _isNoLocal; + } + + public int getNodeType() { + return _nodeType; + } + + public QpidQueueOptions getQueueOptions() { + return _queueOptions; + } + + public List getBindings() { + return _bindings; + } + + public void addBinding(Binding binding) { + this._bindings.add(binding); + } + + public DestSyntax getDestSyntax() { + return _destSyntax; + } + + public QpidExchangeOptions getExchangeOptions() { + return _exchangeOptions; + } + + public String getAlternateExchange() { + return _alternateExchange; + } + + public void setAlternateExchange(String alternateExchange) { + this._alternateExchange = alternateExchange; + } + + public void setExchangeName(AMQShortString name) + { + this._exchangeName = name; + } + + public void setExchangeClass(AMQShortString type) + { + this._exchangeClass = type; + } + + public void setRoutingKey(AMQShortString rk) + { + this._routingKey = rk; + } + + private Address createAddressFromString(String str) + { + if (str.startsWith("ADDR:")) + { + str = str.substring(str.indexOf(':')+1,str.length()); + } + return Address.parse(str); + } + + private void getInfoFromAddress() + { + _name = _address.getName(); + _subject = _address.getSubject(); + + AddressHelper addrHelper = new AddressHelper(_address); + + _create = addrHelper.getCreate() != null ? + AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER; + + _assert = addrHelper.getAssert() != null ? + AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS; + + _delete = addrHelper.getDelete() != null ? + AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER; + + _filter = addrHelper.getFilter(); + _isNoLocal = addrHelper.isNoLocal(); + _isDurable = addrHelper.isDurable(); + _isAutoDelete = addrHelper.isAutoDelete(); + _isExclusive = addrHelper.isExclusive(); + _browseOnly = addrHelper.isBrowseOnly(); + + _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")? + QUEUE_TYPE : TOPIC_TYPE; + + _alternateExchange = addrHelper.getAltExchange(); + + _queueOptions = addrHelper.getQpidQueueOptions(); + _exchangeOptions = addrHelper.getQpidExchangeOptions(); + _bindings = addrHelper.getBindings(); + } + + // ----- / new address syntax ----------- + public boolean isBrowseOnly() { return _browseOnly; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9f934d1055..be7af6b21f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -28,6 +28,8 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -203,7 +205,7 @@ public abstract class AMQSession0) { rk = bindingKeys[0].toString(); @@ -480,18 +529,32 @@ public class AMQSession_0_10 extends AMQSession args) + throws JMSException + { + boolean res; ExchangeBoundResult bindingQueryResult = - getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get(); + getQpidSession().exchangeBound(exchangeName,queueName, bindingKey, args).get(); - if (rk == null) + if (bindingKey == null) { res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } else - { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult - .getQueueNotMatched()); + { + if (args == null) + { + res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + .getQueueNotMatched()); + } + else + { + res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched()); + } } return res; } @@ -566,15 +629,26 @@ public class AMQSession_0_10 extends AMQSession args, + final boolean nowait) throws AMQException + { + getQpidSession().exchangeDeclare( + name, + type, + alternateExchange, + args, + name.toString().startsWith("amq.") ? Option.PASSIVE + : Option.NONE); // We need to sync so that we get notify of an error. if (!nowait) { @@ -598,28 +672,35 @@ public class AMQSession_0_10 extends AMQSession arguments = null; - if (noLocal) - { - arguments = new HashMap(); + + Map arguments = new HashMap(); + if (noLocal || amqd.isNoLocal()) + { arguments.put("no-local", true); } - getQpidSession().queueDeclare(res.toString(), null, arguments, + + if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null) + { + arguments.putAll(amqd.getQueueOptions()); + } + + getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, - !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false if (!nowait) { @@ -627,7 +708,7 @@ public class AMQSession_0_10 extends AMQSession target,Map source) + { + boolean match = true; + for (String key: source.keySet()) + { + match = target.containsKey(key) && + target.get(key).equals(source.get(key)); + + if (!match) return match; + } + + return match; + } + public void handleAddressBasedDestination(AMQDestination dest, + boolean isConsumer, + boolean noWait) throws AMQException + { + boolean noLocal = dest.isNoLocal(); + boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || + (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || + (!isConsumer && dest.getAssert() == AddressOption.SENDER); + + + if (isExchangeExist(dest,assertNode)) + { + dest.setExchangeName(new AMQShortString(dest.getName())); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + if (isConsumer) + { + dest.setQueueName(null); + dest.addBinding(new Binding(dest.getName(), + dest.getSubject(), + null)); + } + } + else if (isQueueExist(dest,assertNode)) + { + dest.setQueueName(new AMQShortString(dest.getName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + else if (dest.getCreate() == AddressOption.ALWAYS || + dest.getCreate() == AddressOption.RECEIVER && isConsumer || + dest.getCreate() == AddressOption.SENDER && !isConsumer) + { + if (dest.getNodeType() == AMQDestination.QUEUE_TYPE) + { + dest.setQueueName(new AMQShortString(dest.getName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + else + { + dest.setQueueName(null); + dest.setExchangeName(new AMQShortString(dest.getName())); + dest.setExchangeClass(dest.getExchangeClass() == null? + ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass()); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + dest.addBinding(new Binding(dest.getName(), + dest.getSubject(), + null)); + + sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(), + dest.getAlternateExchange(), dest.getExchangeOptions(),false); + + } + + send0_10QueueDeclare(dest,null,noLocal,noWait); + } + else + { + throw new AMQException("The name supplied in the address doesn't resolve to an exchange or a queue"); + } + } + + /** This should be moved to a suitable utility class */ + private String printMap(Map map) + { + StringBuilder sb = new StringBuilder(); + sb.append("<"); + if (map != null) + { + for(String key : map.keySet()) + { + sb.append(key).append(" = ").append(map.get(key)).append(" "); + } + } + sb.append(">"); + return sb.toString(); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 9b84421612..edcdbebba9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -590,4 +590,11 @@ public final class AMQSession_0_8 extends AMQSession