diff options
Diffstat (limited to 'qpid/java/client/src')
9 files changed, 632 insertions, 91 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index 4bb2c12cc8..a5c6f5f967 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.BindingURL; @@ -38,6 +40,11 @@ public class AMQAnyDestination extends AMQDestination { super(binding); } + + public AMQAnyDestination(String str) throws URISyntaxException + { + super(str); + } public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass, AMQShortString routingKey,boolean isExclusive, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d6f91daae0..653e049002 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -54,7 +54,6 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import org.apache.configuration.ClientProperties; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; @@ -63,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicQosBody; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 0d1a89a6c0..38e5b4fee0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -29,10 +29,10 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; -import org.apache.configuration.ClientProperties; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 311ef1f486..1ed64e7890 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -21,6 +21,9 @@ package org.apache.qpid.client; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import javax.jms.Destination; import javax.naming.NamingException; @@ -28,26 +31,35 @@ import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; +import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.QpidExchangeOptions; +import org.apache.qpid.client.messaging.address.QpidQueueOptions; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AMQDestination implements Destination, Referenceable { - protected final AMQShortString _exchangeName; + private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class); + + protected AMQShortString _exchangeName; - protected final AMQShortString _exchangeClass; + protected AMQShortString _exchangeClass; - protected final boolean _isDurable; + protected boolean _isDurable; - protected final boolean _isExclusive; + protected boolean _isExclusive; - protected final boolean _isAutoDelete; + protected boolean _isAutoDelete; - private final boolean _browseOnly; + private boolean _browseOnly; private AMQShortString _queueName; @@ -70,13 +82,107 @@ public abstract class AMQDestination implements Destination, Referenceable public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; - - protected AMQDestination(String url) throws URISyntaxException + + // ----- Fields required to support new address syntax ------- + + public enum DestSyntax { + BURL,ADDR; + + public static DestSyntax getSyntaxType(String s) + { + if (("BURL").equals(s)) + { + return BURL; + } + else if (("ADDR").equals(s)) + { + return ADDR; + } + else + { + throw new IllegalArgumentException("Invalid Destination Syntax Type" + + " should be one of {BURL|ADDR}"); + } + } + } + + public enum AddressOption { + ALWAYS, NEVER, SENDER, RECEIVER; + + public static AddressOption getOption(String str) + { + if ("always".equals(str)) return ALWAYS; + else if ("never".equals(str)) return NEVER; + else if ("sender".equals(str)) return SENDER; + else if ("receiver".equals(str)) return RECEIVER; + else throw new IllegalArgumentException(str + " is not an allowed value"); + } + } + + public enum FilterType { SQL92, XQUERY, SUBJECT } + + protected static DestSyntax defaultDestSyntax; + + protected DestSyntax _destSyntax; + + protected Address _address; + protected String _name; + protected String _subject; + protected AddressOption _create = AddressOption.NEVER; + protected AddressOption _assert = AddressOption.ALWAYS; + protected AddressOption _delete = AddressOption.NEVER; + + protected String _filter; + protected FilterType _filterType = FilterType.SUBJECT; + protected boolean _isNoLocal; + protected int _nodeType = QUEUE_TYPE; + protected String _alternateExchange; + protected QpidQueueOptions _queueOptions; + protected QpidExchangeOptions _exchangeOptions; + protected List<Binding> _bindings = new ArrayList<Binding>(); + // ----- / Fields required to support new address syntax ------- + + static + { + defaultDestSyntax = DestSyntax.getSyntaxType( + System.getProperty(ClientProperties.DEST_SYNTAX, + DestSyntax.BURL.toString())); + } + + protected AMQDestination(Address address) + { + this._address = address; + getInfoFromAddress(); + _destSyntax = DestSyntax.ADDR; + _logger.info("Based on " + address + " the selected destination syntax is " + _destSyntax); + } + + protected AMQDestination(String str) throws URISyntaxException + { + if (str.startsWith("BURL:") || + (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL)) + { + _destSyntax = DestSyntax.BURL; + getInfoFromBindingURL(new AMQBindingURL(str)); + } + else + { + _destSyntax = DestSyntax.ADDR; + this._address = createAddressFromString(str); + getInfoFromAddress(); + } + _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax); + } + + //retained for legacy support + protected AMQDestination(BindingURL binding) { - this(new AMQBindingURL(url)); + getInfoFromBindingURL(binding); + _destSyntax = DestSyntax.BURL; + _logger.info("Based on " + binding + " the selected destination syntax is " + _destSyntax); } - protected AMQDestination(BindingURL binding) + protected void getInfoFromBindingURL(BindingURL binding) { _exchangeName = binding.getExchangeName(); _exchangeClass = binding.getExchangeClass(); @@ -153,7 +259,9 @@ public abstract class AMQDestination implements Destination, Referenceable _queueName = queueName; _isDurable = isDurable; _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; + _destSyntax = DestSyntax.BURL; _browseOnly = browseOnly; + _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax); } public AMQShortString getEncodedName() @@ -243,7 +351,14 @@ public abstract class AMQDestination implements Destination, Referenceable public String toString() { - return toURL(); + if (_destSyntax == DestSyntax.BURL) + { + return toURL(); + } + else + { + return _address.toString(); + } } @@ -424,8 +539,8 @@ public abstract class AMQDestination implements Destination, Referenceable public int hashCode() { int result; - result = _exchangeName.hashCode(); - result = 29 * result + _exchangeClass.hashCode(); + result = _exchangeName == null ? "".hashCode() : _exchangeName.hashCode(); + result = 29 * result + (_exchangeClass == null ? "".hashCode() :_exchangeClass.hashCode()); //result = 29 * result + _destinationName.hashCode(); if (_queueName != null) { @@ -513,6 +628,163 @@ public abstract class AMQDestination implements Destination, Referenceable } } + // ----- new address syntax ----------- + public static class Binding + { + String exchange; + String bindingKey; + Map<String,Object> args; + + public Binding(String exchange,String bindingKey,Map<String,Object> args) + { + this.exchange = exchange; + this.bindingKey = bindingKey; + this.args = args; + } + + public String getExchange() + { + return exchange; + } + + public String getBindingKey() + { + return bindingKey; + } + + public Map<String, Object> getArgs() + { + return args; + } + } + + public Address getAddress() { + return _address; + } + + public String getName() { + return _name; + } + + public String getSubject() { + return _subject; + } + + public AddressOption getCreate() { + return _create; + } + + public AddressOption getAssert() { + return _assert; + } + + public AddressOption getDelete() { + return _delete; + } + + public String getFilter() { + return _filter; + } + + public FilterType getFilterType() { + return _filterType; + } + + public boolean isNoLocal() { + return _isNoLocal; + } + + public int getNodeType() { + return _nodeType; + } + + public QpidQueueOptions getQueueOptions() { + return _queueOptions; + } + + public List<Binding> getBindings() { + return _bindings; + } + + public void addBinding(Binding binding) { + this._bindings.add(binding); + } + + public DestSyntax getDestSyntax() { + return _destSyntax; + } + + public QpidExchangeOptions getExchangeOptions() { + return _exchangeOptions; + } + + public String getAlternateExchange() { + return _alternateExchange; + } + + public void setAlternateExchange(String alternateExchange) { + this._alternateExchange = alternateExchange; + } + + public void setExchangeName(AMQShortString name) + { + this._exchangeName = name; + } + + public void setExchangeClass(AMQShortString type) + { + this._exchangeClass = type; + } + + public void setRoutingKey(AMQShortString rk) + { + this._routingKey = rk; + } + + private Address createAddressFromString(String str) + { + if (str.startsWith("ADDR:")) + { + str = str.substring(str.indexOf(':')+1,str.length()); + } + return Address.parse(str); + } + + private void getInfoFromAddress() + { + _name = _address.getName(); + _subject = _address.getSubject(); + + AddressHelper addrHelper = new AddressHelper(_address); + + _create = addrHelper.getCreate() != null ? + AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER; + + _assert = addrHelper.getAssert() != null ? + AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS; + + _delete = addrHelper.getDelete() != null ? + AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER; + + _filter = addrHelper.getFilter(); + _isNoLocal = addrHelper.isNoLocal(); + _isDurable = addrHelper.isDurable(); + _isAutoDelete = addrHelper.isAutoDelete(); + _isExclusive = addrHelper.isExclusive(); + _browseOnly = addrHelper.isBrowseOnly(); + + _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")? + QUEUE_TYPE : TOPIC_TYPE; + + _alternateExchange = addrHelper.getAltExchange(); + + _queueOptions = addrHelper.getQpidQueueOptions(); + _exchangeOptions = addrHelper.getQpidExchangeOptions(); + _bindings = addrHelper.getBindings(); + } + + // ----- / new address syntax ----------- + public boolean isBrowseOnly() { return _browseOnly; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9f934d1055..be7af6b21f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -28,6 +28,8 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -203,7 +205,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); - + protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE; /** System property to enable strict AMQP compliance. */ @@ -368,7 +370,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private boolean _dirty; /** Has failover occured on this session with outstanding actions to commit? */ private boolean _failedOverDirty; - + private static final class FlowControlIndicator { private volatile boolean _flowControl = true; @@ -2095,7 +2097,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (tempDest.getSession() != this) { _logger.debug("destination is on different session"); - throw new JMSException("Cannot consume from a temporary destination created onanother session"); + throw new JMSException("Cannot consume from a temporary destination created on another session"); } if (tempDest.isDeleted()) @@ -2301,7 +2303,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); long producerId = getNextProducerId(); P producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + immediate, waitUntilSent, producerId); registerProducer(producerId, producer); return producer; @@ -2535,15 +2537,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - if (DECLARE_EXCHANGES) + if (amqd.getDestSyntax() == DestSyntax.ADDR) { - declareExchange(amqd, protocolHandler, nowait); + handleAddressBasedDestination(amqd,true,nowait); } - - if (DECLARE_QUEUES || amqd.isNameRequired()) + else { - declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); + if (DECLARE_EXCHANGES) + { + declareExchange(amqd, protocolHandler, nowait); + } + + if (DECLARE_QUEUES || amqd.isNameRequired()) + { + declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); + } } + AMQShortString queueName = amqd.getAMQQueueName(); // store the consumer queue name @@ -2589,6 +2599,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public abstract void handleAddressBasedDestination(AMQDestination dest, + boolean isConsumer, + boolean noWait) throws AMQException; + private void registerProducer(long producerId, MessageProducer producer) { _producers.put(new Long(producerId), producer); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 71d6066c01..018613800c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,33 +17,60 @@ */ package org.apache.qpid.client; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; +import static org.apache.qpid.transport.Option.BATCH; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import static org.apache.qpid.transport.Option.UNRELIABLE; + +import java.lang.ref.WeakReference; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.Binding; +import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.FiledTableSupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.util.Serial; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.ExchangeQueryResult; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.QueueQueryResult; import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.util.Serial; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,7 +150,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); private TimerTask flushTask = null; private RangeSet unacked = new RangeSet(); - private int unackedCount = 0; + private int unackedCount = 0; /** * USed to store the range of in tx messages @@ -305,18 +332,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { - Map args = FiledTableSupport.convertToMap(arguments); - // this is there only becasue the broker may expect a value for x-match - if( ! args.containsKey("x-match") ) + if (destination.getDestSyntax() == DestSyntax.BURL) { - args.put("x-match", "any"); + Map args = FiledTableSupport.convertToMap(arguments); + // this is there only becasue the broker may expect a value for x-match + if( ! args.containsKey("x-match") ) + { + args.put("x-match", "any"); + } + + for (AMQShortString rk: destination.getBindingKeys()) + { + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + + " using binding key " + rk.asString()); + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), + args); + } } - - for (AMQShortString rk: destination.getBindingKeys()) + else { - _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + for (Binding binding: destination.getBindings()) + { + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + binding.getExchange() + + " using binding key " + binding.getBindingKey() + + " with args " + printMap(binding.getArgs())); + getQpidSession().exchangeBind(queueName.toString(), + binding.getExchange(), + binding.getBindingKey(), + binding.getArgs()); + } } + if (!nowait) { // We need to sync so that we get notify of an error. @@ -470,8 +520,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) throws JMSException { - String rk = null; - boolean res; + String rk = null; if (bindingKeys != null && bindingKeys.length>0) { rk = bindingKeys[0].toString(); @@ -480,18 +529,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { rk = routingKey.toString(); } - + return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null); + } + + public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) + throws JMSException + { + boolean res; ExchangeBoundResult bindingQueryResult = - getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get(); + getQpidSession().exchangeBound(exchangeName,queueName, bindingKey, args).get(); - if (rk == null) + if (bindingKey == null) { res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } else - { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult - .getQueueNotMatched()); + { + if (args == null) + { + res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + .getQueueNotMatched()); + } + else + { + res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched()); + } } return res; } @@ -566,15 +629,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * creates an exchange if it does not already exist */ - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) + public void sendExchangeDeclare(final AMQShortString name, + final AMQShortString type, + final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException { - getQpidSession().exchangeDeclare(name.toString(), - type.toString(), - null, - null, - name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); + sendExchangeDeclare(name.asString(), type.asString(), null, null, + nowait); + } + + public void sendExchangeDeclare(final String name, final String type, + final String alternateExchange, final Map<String, Object> args, + final boolean nowait) throws AMQException + { + getQpidSession().exchangeDeclare( + name, + type, + alternateExchange, + args, + name.toString().startsWith("amq.") ? Option.PASSIVE + : Option.NONE); // We need to sync so that we get notify of an error. if (!nowait) { @@ -598,28 +672,35 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, final boolean noLocal, final boolean nowait) - throws AMQException, FailoverException + throws AMQException { - AMQShortString res; + AMQShortString queueName; if (amqd.getAMQQueueName() == null) { // generate a name for this queue - res = new AMQShortString("TempQueue" + UUID.randomUUID()); + queueName = new AMQShortString("TempQueue" + UUID.randomUUID()); + amqd.setQueueName(queueName); } else { - res = amqd.getAMQQueueName(); + queueName = amqd.getAMQQueueName(); } - Map<String,Object> arguments = null; - if (noLocal) - { - arguments = new HashMap<String,Object>(); + + Map<String,Object> arguments = new HashMap<String,Object>(); + if (noLocal || amqd.isNoLocal()) + { arguments.put("no-local", true); } - getQpidSession().queueDeclare(res.toString(), null, arguments, + + if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null) + { + arguments.putAll(amqd.getQueueOptions()); + } + + getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, - !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false if (!nowait) { @@ -627,7 +708,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().sync(); getCurrentException(); } - return res; + return queueName; } /** @@ -934,5 +1015,136 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return AMQMessageDelegateFactory.FACTORY_0_10; } + + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) + { + boolean match = true; + ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get(); + match = !result.getNotFound(); + + if (match && assertNode) + { + match = (result.getDurable() == dest.isDurable()) && + (dest.getExchangeClass().asString().equals(result.getType())) && + (matchProps(result.getArguments(),dest.getQueueOptions())); + } + if (match) + { + dest.setExchangeClass(new AMQShortString(result.getType())); + } + + return match; + } + + public boolean isQueueExist(AMQDestination dest,boolean assertNode) + { + boolean match = true; + QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get(); + match = dest.getName().equals(result.getQueue()); + + if (match && assertNode) + { + match = (result.getDurable() == dest.isDurable()) && + (result.getAutoDelete() == dest.isAutoDelete()) && + (result.getExclusive() == dest.isExclusive()) && + (matchProps(result.getArguments(),dest.getQueueOptions())); + } + + return match; + } + + private boolean matchProps(Map<String,Object> target,Map<String,Object> source) + { + boolean match = true; + for (String key: source.keySet()) + { + match = target.containsKey(key) && + target.get(key).equals(source.get(key)); + + if (!match) return match; + } + + return match; + } + public void handleAddressBasedDestination(AMQDestination dest, + boolean isConsumer, + boolean noWait) throws AMQException + { + boolean noLocal = dest.isNoLocal(); + boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || + (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || + (!isConsumer && dest.getAssert() == AddressOption.SENDER); + + + if (isExchangeExist(dest,assertNode)) + { + dest.setExchangeName(new AMQShortString(dest.getName())); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + if (isConsumer) + { + dest.setQueueName(null); + dest.addBinding(new Binding(dest.getName(), + dest.getSubject(), + null)); + } + } + else if (isQueueExist(dest,assertNode)) + { + dest.setQueueName(new AMQShortString(dest.getName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + else if (dest.getCreate() == AddressOption.ALWAYS || + dest.getCreate() == AddressOption.RECEIVER && isConsumer || + dest.getCreate() == AddressOption.SENDER && !isConsumer) + { + if (dest.getNodeType() == AMQDestination.QUEUE_TYPE) + { + dest.setQueueName(new AMQShortString(dest.getName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + else + { + dest.setQueueName(null); + dest.setExchangeName(new AMQShortString(dest.getName())); + dest.setExchangeClass(dest.getExchangeClass() == null? + ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass()); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + dest.addBinding(new Binding(dest.getName(), + dest.getSubject(), + null)); + + sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(), + dest.getAlternateExchange(), dest.getExchangeOptions(),false); + + } + + send0_10QueueDeclare(dest,null,noLocal,noWait); + } + else + { + throw new AMQException("The name supplied in the address doesn't resolve to an exchange or a queue"); + } + } + + /** This should be moved to a suitable utility class */ + private String printMap(Map<String,Object> map) + { + StringBuilder sb = new StringBuilder(); + sb.append("<"); + if (map != null) + { + for(String key : map.keySet()) + { + sb.append(key).append(" = ").append(map.get(key)).append(" "); + } + } + sb.append(">"); + return sb.toString(); + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 9b84421612..edcdbebba9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -590,4 +590,11 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } + public void handleAddressBasedDestination(AMQDestination dest, + boolean isConsumer, + boolean noWait) throws AMQException + { + throw new UnsupportedOperationException("The new addressing based sytanx is " + + "not supported for AMQP 0-8/0-9 versions"); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index a1b5ce6f4c..4cc419b0cf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -17,34 +17,39 @@ */ package org.apache.qpid.client; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; + import java.nio.ByteBuffer; +import java.util.UUID; +import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.DeliveryMode; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.AMQDestination.AddressOption; +import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.Option; import org.apache.qpid.util.Strings; -import org.apache.qpid.njms.ExceptionHelper; -import org.apache.qpid.transport.*; -import static org.apache.qpid.transport.Option.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a 0_10 message producer. */ public class BasicMessageProducer_0_10 extends BasicMessageProducer { + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class); private byte[] userIDBytes; BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, @@ -59,12 +64,27 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer void declareDestination(AMQDestination destination) { - String name = destination.getExchangeName().toString(); - ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare - (name, - destination.getExchangeClass().toString(), - null, null, - name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); + if (destination.getDestSyntax() == DestSyntax.BURL) + { + String name = destination.getExchangeName().toString(); + ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare + (name, + destination.getExchangeClass().toString(), + null, null, + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE); + } + else + { + try + { + getSession().handleAddressBasedDestination(destination,false,false); + } + catch(Exception e) + { + // Idealy this should be thrown to the JMS layer. + _logger.warn("Exception occured while verifying destination",e); + } + } } //--- Overwritten methods @@ -136,7 +156,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority)); message.setJMSPriority(priority); } - String exchangeName = destination.getExchangeName().toString(); + String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(); if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName)) { deliveryProp.setExchange(exchangeName); @@ -166,7 +186,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer org.apache.mina.common.ByteBuffer data = message.getData(); ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice(); - ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE, + ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(), + MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProp, messageProps), buffer, sync ? SYNC : NONE); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 9b2a6693e1..dd8377a94a 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -175,4 +175,12 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void sync() { } + + public void handleAddressBasedDestination(AMQDestination dest, + boolean isConsumer, + boolean noWait) throws AMQException + { + throw new UnsupportedOperationException("The new addressing based sytanx is " + + "not supported for AMQP 0-8/0-9 versions"); + } } |
