diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-23 15:31:04 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-23 15:31:04 +0000 |
| commit | 672f8e531132cb7780d1d09b6eb3a3d5e6ba397e (patch) | |
| tree | a446d3910989b4254517dcdb8dba18e950f96d12 /qpid/java/client/src/main | |
| parent | 1044edb5417c80edae3f211ec38dae4a58c4a647 (diff) | |
| download | qpid-python-672f8e531132cb7780d1d09b6eb3a3d5e6ba397e.tar.gz | |
QPID-6037 : [Java Client] Add experimental support for ADDR addressing to the 0-8/9/9-1 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620036 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
8 files changed, 653 insertions, 248 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 5242629a91..9650ad76fb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; private boolean _messageCompressionSupported; + private boolean _addrSyntaxSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public AMQConnectionDelegate_8_0(AMQConnection conn) { _conn = conn; + _addrSyntaxSupported = + Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8, + String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT))); } protected boolean checkException(Throwable thrown) @@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _messageCompressionSupported; } + + public boolean isAddrSyntaxSupported() + { + return _addrSyntaxSupported; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index e06fc0f1de..2714caf2a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,20 @@ */ package org.apache.qpid.client; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Destination; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import javax.jms.Destination; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - public abstract class AMQDestination implements Destination, Referenceable, Externalizable { @@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _address = addr; } - public int getAddressType(){ + public int getAddressType() + { return _addressType; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c2659194e2..0b299a22cd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ListMessage; @@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; /* * TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For @@ -600,6 +603,128 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public void setLegacyFieldsForQueueType(AMQDestination dest) + { + // legacy support + dest.setQueueName(new AMQShortString(dest.getAddressName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + + public void setLegacyFieldsForTopicType(AMQDestination dest) + { + // legacy support + dest.setExchangeName(new AMQShortString(dest.getAddressName())); + Node node = dest.getNode(); + dest.setExchangeClass(node.getExchangeType() == null? + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): + new AMQShortString(node.getExchangeType())); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + } + + protected void verifySubject(AMQDestination dest) throws AMQException + { + if (dest.getSubject() == null || dest.getSubject().trim().equals("")) + { + + if ("topic".equals(dest.getExchangeClass().toString())) + { + dest.setRoutingKey(new AMQShortString("#")); + dest.setSubject(dest.getRoutingKey().toString()); + } + else + { + dest.setRoutingKey(new AMQShortString("")); + dest.setSubject(""); + } + } + } + + public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException; + + public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException; + + /** + * 1. Try to resolve the address type (queue or exchange) + * 2. if type == queue, + * 2.1 verify queue exists or create if create == true + * 2.2 If not throw exception + * + * 3. if type == exchange, + * 3.1 verify exchange exists or create if create == true + * 3.2 if not throw exception + * 3.3 if exchange exists (or created) create subscription queue. + */ + + @SuppressWarnings("deprecation") + public void resolveAddress(AMQDestination dest, + boolean isConsumer, + boolean noLocal) throws AMQException + { + if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) + { + return; + } + else + { + boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) || + (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) || + (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER); + + boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) || + (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) || + (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER); + + + + int type = resolveAddressType(dest); + + switch (type) + { + case AMQDestination.QUEUE_TYPE: + { + if(createNode) + { + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); + break; + } + else if (isQueueExist(dest,assertNode)) + { + setLegacyFieldsForQueueType(dest); + break; + } + } + + case AMQDestination.TOPIC_TYPE: + { + if(createNode) + { + setLegacyFieldsForTopicType(dest); + verifySubject(dest); + handleExchangeNodeCreation(dest); + break; + } + else if (isExchangeExist(dest,assertNode)) + { + setLegacyFieldsForTopicType(dest); + verifySubject(dest); + break; + } + } + + default: + throw new AMQException( + "The name '" + dest.getAddressName() + + "' supplied in the address doesn't resolve to an exchange or a queue"); + } + dest.setAddressResolved(System.currentTimeMillis()); + } + } + + public abstract int resolveAddressType(AMQDestination dest) throws AMQException; + protected abstract void acknowledgeImpl() throws JMSException; /** @@ -2594,6 +2719,54 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + + void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doBind(dest, binding, queue, exchange); + } + } + + protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException; + + abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException; + + abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) + throws AMQException; + public abstract void sendConsume(C consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException; @@ -2696,7 +2869,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @throws AMQException If the exchange cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - private void declareExchange(final AMQShortString name, final AMQShortString type, + void declareExchange(final AMQShortString name, final AMQShortString type, final boolean nowait, final boolean durable, final boolean autoDelete, final boolean internal) throws AMQException { @@ -2710,9 +2883,53 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic }, _connection).execute(); } + void declareExchange(final AMQShortString name, final AMQShortString type, + final boolean nowait, final boolean durable, + final boolean autoDelete, final FieldTable arguments, + final boolean passive) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive); + return null; + } + }, _connection).execute(); + } + + protected AMQShortString preprocessAddressTopic(final C consumer, + AMQShortString queueName) throws AMQException + { + if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } + return queueName; + } + + abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException; + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; + + public abstract void sendExchangeDeclare(final AMQShortString name, + final AMQShortString type, + final boolean nowait, + boolean durable, + boolean autoDelete, + FieldTable arguments, + final boolean passive) throws AMQException, FailoverException; + /** * Declares a queue for a JMS destination. * <p> @@ -2930,10 +3147,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) throws AMQException; - public abstract void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException; - private void registerProducer(long producerId, MessageProducer producer) { _producers.put(producerId, producer); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 19720ea386..46f999e452 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; @@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; @@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); + doBind(destination, binding, queue, exchange); } } @@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean nowait, int tag) throws AMQException, FailoverException { - if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) - { - if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) - { - String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); - - createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); - queueName = consumer.getDestination().getAMQQueueName(); - consumer.setQueuename(queueName); - } - handleLinkCreation(consumer.getDestination()); - } + queueName = preprocessAddressTopic(consumer, queueName); boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + sendExchangeDeclare(name.asString(), type.asString(), null, + arguments == null ? null : FieldTableSupport.convertToMap(arguments), + nowait, durable, autoDelete); + } + + public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, final boolean nowait, boolean durable, boolean autoDelete) throws AMQException @@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } + @Override public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; @@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } + @Override public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { Node node = dest.getNode(); @@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - /** - * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, - * 2.1 verify queue exists or create if create == true - * 2.2 If not throw exception - * - * 3. if type == exchange, - * 3.1 verify exchange exists or create if create == true - * 3.2 if not throw exception - * 3.3 if exchange exists (or created) create subscription queue. - */ - - @SuppressWarnings("deprecation") - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException - { - if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) - { - return; - } - else - { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - switch (type) - { - case AMQDestination.QUEUE_TYPE: - { - if(createNode) - { - setLegacyFieldsForQueueType(dest); - handleQueueNodeCreation(dest,noLocal); - break; - } - else if (isQueueExist(dest,assertNode)) - { - setLegacyFieldsForQueueType(dest); - break; - } - } - - case AMQDestination.TOPIC_TYPE: - { - if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - handleExchangeNodeCreation(dest); - break; - } - else if (isExchangeExist(dest,assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - break; - } - } - - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); - } - dest.setAddressResolved(System.currentTimeMillis()); - } - } - + @Override public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); @@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void verifySubject(AMQDestination dest) throws AMQException - { - if (dest.getSubject() == null || dest.getSubject().trim().equals("")) - { - - if ("topic".equals(dest.getExchangeClass().toString())) - { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); - } - else - { - dest.setRoutingKey(new AMQShortString("")); - dest.setSubject(""); - } - } - } - + @Override void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { Link link = dest.getLink(); @@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic bindingArguments); } - public void setLegacyFieldsForQueueType(AMQDestination dest) - { - // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - - public void setLegacyFiledsForTopicType(AMQDestination dest) - { - // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); - Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? - AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - } - protected void acknowledgeImpl() { RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); @@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + @Override + protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException { Node node = dest.getNode(); Map<String,Object> arguments = node.getDeclareArgs(); @@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } + @Override void handleExchangeNodeCreation(AMQDestination dest) throws AMQException { Node node = dest.getNode(); @@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } - void handleLinkCreation(AMQDestination dest) throws AMQException - { - createBindings(dest, dest.getLink().getBindings()); - } - - void createBindings(AMQDestination dest, List<Binding> bindings) + protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange) { - String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest - .getAddressName() : "amq.topic"; - - String defaultQueueName = null; - if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) - { - defaultQueueName = dest.getQueueName(); - } - else - { - defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); - } - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - defaultQueueName: binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchangeForBinding : - binding.getExchange(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + Strings.printMap(binding.getArgs())); - } - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); } void handleLinkDelete(AMQDestination dest) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index dbbc300910..e5ca82f56a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; @@ -48,10 +49,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; @@ -230,9 +235,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe table.setObject(entry.getKey(), entry.getValue()); } } - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); - AMQFrame queueDeclare = body.generateFrame(getChannelId()); - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + sendQueueDeclare(name, durable, exclusive, autoDelete, table, false); } public void sendRecover() throws AMQException, FailoverException @@ -428,6 +431,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return (responseBody.getReplyCode() == 0); } + + protected boolean exchangeExists(final AMQShortString exchangeName) + throws AMQException + { + if(!getAMQConnection().getDelegate().supportsIsBound()) + { + return false; + } + + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, null, null); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + // valid if no issues, or just no bindings + return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3); + } + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) throws AMQException, FailoverException @@ -444,6 +473,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe boolean nowait, int tag) throws AMQException, FailoverException { + queueName = preprocessAddressTopic(consumer, queueName); BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, @@ -468,6 +498,63 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } @Override + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException + { + final Link link = dest.getLink(); + final String queueName ; + + if (dest.getQueueName() == null) + { + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); + } + else + { + queueName = dest.getQueueName(); + } + + final Link.SubscriptionQueue queueProps = link.getSubscriptionQueue(); + final Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + } + + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + // not setting alternate exchange + sendQueueDeclare(AMQShortString.valueOf(queueName), + link.isDurable(), + queueProps.isExclusive(), + queueProps.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + Map<String,Object> bindingArguments = new HashMap<String, Object>(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + bindQueue(AMQShortString.valueOf(queueName), + AMQShortString.valueOf(dest.getSubject()), + FieldTable.convertToFieldTable(bindingArguments), + AMQShortString.valueOf(dest.getAddressName()),dest,false); + + } + + @Override public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { @@ -481,17 +568,52 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + @Override + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + + MethodRegistry methodRegistry = getMethodRegistry(); + ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(), + name, + type, + passive || name.toString().startsWith("amq."), + durable, + autoDelete, + false, + false, + arguments); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { + AMQShortString queueName = amqd.getAMQQueueName(); + boolean durable = amqd.isDurable(); + boolean exclusive = amqd.isExclusive(); + boolean autoDelete = amqd.isAutoDelete(); + FieldTable arguments = null; + sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive); + } + + private void sendQueueDeclare(final AMQShortString queueName, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, final FieldTable arguments, final boolean passive) + throws AMQException, FailoverException + { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), + queueName, passive, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), + durable, + exclusive, + autoDelete, false, - null); + arguments); AMQFrame queueDeclare = body.generateFrame(getChannelId()); @@ -733,13 +855,207 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException + @Override + public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal) + throws AMQException + { + if(!isAddrSyntaxSupported()) + { + throw new UnsupportedAddressSyntaxException(dest); + } + super.resolveAddress(dest, isConsumer, noLocal); + } + + private boolean isAddrSyntaxSupported() + { + return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported(); + } + + public int resolveAddressType(AMQDestination dest) throws AMQException + { + int type = dest.getAddressType(); + String name = dest.getAddressName(); + if (type != AMQDestination.UNKNOWN_TYPE) + { + return type; + } + else + { + boolean isExchange = exchangeExists(AMQShortString.valueOf(name)); + boolean isQueue = isBound(null,AMQShortString.valueOf(name), null); + + if (!isExchange && !isQueue) + { + type = dest instanceof AMQTopic ? AMQDestination.TOPIC_TYPE : AMQDestination.QUEUE_TYPE; + } + else if (!isExchange) + { + //name refers to a queue + type = AMQDestination.QUEUE_TYPE; + } + else if (!isQueue) + { + //name refers to an exchange + type = AMQDestination.TOPIC_TYPE; + } + else + { + //both a queue and exchange exist for that name + throw new AMQException("Ambiguous address, please specify queue or topic as node type"); + } + dest.setAddressType(type); + return type; + } + } + + protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException + { + final Node node = dest.getNode(); + final Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()), + node.isDurable(), + node.isExclusive(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + // can't set alt. exchange + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(node.getDeclareArgs()), false); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + + protected void doBind(final AMQDestination dest, + final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()), + FieldTable.convertToFieldTable(binding.getArgs()), + AMQShortString.valueOf(exchange),dest); + } + + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException + { + Node node = dest.getNode(); + return isQueueExist(dest.getAddressName(), assertNode, + node.isDurable(), node.isAutoDelete(), + node.isExclusive(), node.getDeclareArgs()); + } + + public boolean isQueueExist(final String queueName, boolean assertNode, + final boolean durable, final boolean autoDelete, + final boolean exclusive, final Map<String, Object> args) throws AMQException { - throw new UnsupportedAddressSyntaxException(dest); + boolean match = isBound(null,AMQShortString.valueOf(queueName), null); + + if (assertNode) + { + if(!match) + { + throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." ); + + } + else + { + + new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(queueName), + durable, + exclusive, + autoDelete, + FieldTable.convertToFieldTable(args), + true); + + return null; + } + }, getAMQConnection()); + + } + } + + + return match; } + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException + { + boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName())); + + Node node = dest.getNode(); + + if (match) + { + if (assertNode) + { + + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(node.getDeclareArgs()), true); + + } + else + { + // TODO - some way to determine the exchange type + /* + _logger.debug("Setting Exchange type " + result.getType()); + node.setExchangeType(result.getType()); + dest.setExchangeClass(new AMQShortString(result.getType())); + */ + + } + } + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +". Exchange not found."); + } + } + + return match; + } protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index f735895c81..23d65e15d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client; +import javax.jms.JMSException; +import javax.jms.Message; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; -import javax.jms.JMSException; -import javax.jms.Message; - public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { private final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe _topicDestinationCache = session.getTopicDestinationCache(); _queueDestinationCache = session.getQueueDestinationCache(); + + // This is due to the Destination carrying the temporary subscription name which is incorrect. + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) + { + boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; + + if (!namedQueue) + { + setDestination(destination.copyDestination()); + getDestination().setQueueName(null); + } + } + if (destination.getRejectBehaviour() != null) { _rejectBehaviour = destination.getRejectBehaviour(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 355c456249..89bf146398 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -57,30 +57,34 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory); } - void declareDestination(AMQDestination destination) + void declareDestination(AMQDestination destination) throws AMQException { if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - throw new UnsupportedAddressSyntaxException(destination); + getSession().resolveAddress(destination, false, false); } - - if(getSession().isDeclareExchanges()) + else { - final MethodRegistry methodRegistry = getSession().getMethodRegistry(); - ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getExchangeClass(), - destination.getExchangeName().toString().startsWith("amq."), - destination.isExchangeDurable(), - destination.isExchangeAutoDelete(), - destination.isExchangeInternal(), - true, - null); - AMQFrame declare = body.generateFrame(getChannelId()); - - getConnection().getProtocolHandler().writeFrame(declare); + if (getSession().isDeclareExchanges()) + { + final MethodRegistry methodRegistry = getSession().getMethodRegistry(); + ExchangeDeclareBody body = + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), + destination.getExchangeName(), + destination.getExchangeClass(), + destination.getExchangeName() + .toString() + .startsWith("amq."), + destination.isExchangeDurable(), + destination.isExchangeAutoDelete(), + destination.isExchangeInternal(), + true, + null); + AMQFrame declare = body.generateFrame(getChannelId()); + + getConnection().getProtocolHandler().writeFrame(declare); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index ad9a37479e..bd089eb6a8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -21,6 +21,23 @@ package org.apache.qpid.client.message; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +45,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; @@ -42,22 +58,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.TransportException; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - /** * This extends AbstractAMQMessageDelegate which contains common code between * both the 0_8 and 0_10 Message types. @@ -352,14 +352,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { try { - int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); + int type = getAMQSession().resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd); + getAMQSession().setLegacyFieldsForQueueType(amqd); } else { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd); + getAMQSession().setLegacyFieldsForTopicType(amqd); } } catch(AMQException ex) |
