From 672f8e531132cb7780d1d09b6eb3a3d5e6ba397e Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 23 Aug 2014 15:31:04 +0000 Subject: QPID-6037 : [Java Client] Add experimental support for ADDR addressing to the 0-8/9/9-1 client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620036 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/AMQConnectionDelegate_8_0.java | 10 + .../org/apache/qpid/client/AMQDestination.java | 31 +- .../java/org/apache/qpid/client/AMQSession.java | 223 +++++++++++++- .../org/apache/qpid/client/AMQSession_0_10.java | 198 ++---------- .../org/apache/qpid/client/AMQSession_0_8.java | 340 ++++++++++++++++++++- .../qpid/client/BasicMessageConsumer_0_8.java | 19 +- .../qpid/client/BasicMessageProducer_0_8.java | 40 +-- .../client/message/AMQMessageDelegate_0_10.java | 40 +-- 8 files changed, 653 insertions(+), 248 deletions(-) (limited to 'qpid/java/client/src/main') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 5242629a91..9650ad76fb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; private boolean _messageCompressionSupported; + private boolean _addrSyntaxSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public AMQConnectionDelegate_8_0(AMQConnection conn) { _conn = conn; + _addrSyntaxSupported = + Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8, + String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT))); } protected boolean checkException(Throwable thrown) @@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _messageCompressionSupported; } + + public boolean isAddrSyntaxSupported() + { + return _addrSyntaxSupported; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index e06fc0f1de..2714caf2a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,20 @@ */ package org.apache.qpid.client; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Destination; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import javax.jms.Destination; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - public abstract class AMQDestination implements Destination, Referenceable, Externalizable { @@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _address = addr; } - public int getAddressType(){ + public int getAddressType() + { return _addressType; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c2659194e2..0b299a22cd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ListMessage; @@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; /* * TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For @@ -600,6 +603,128 @@ public abstract class AMQSession bindings) throws AMQException + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doBind(dest, binding, queue, exchange); + } + } + + protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException; + + abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException; + + abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) + throws AMQException; + public abstract void sendConsume(C consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException; @@ -2696,7 +2869,7 @@ public abstract class AMQSession(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive); + return null; + } + }, _connection).execute(); + } + + protected AMQShortString preprocessAddressTopic(final C consumer, + AMQShortString queueName) throws AMQException + { + if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } + return queueName; + } + + abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException; + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; + + public abstract void sendExchangeDeclare(final AMQShortString name, + final AMQShortString type, + final boolean nowait, + boolean durable, + boolean autoDelete, + FieldTable arguments, + final boolean passive) throws AMQException, FailoverException; + /** * Declares a queue for a JMS destination. *

@@ -2930,10 +3147,6 @@ public abstract class AMQSession args, final boolean nowait, boolean durable, boolean autoDelete) throws AMQException @@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession arguments = node.getDeclareArgs(); @@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession bindings) + protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange) { - String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest - .getAddressName() : "amq.topic"; - - String defaultQueueName = null; - if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) - { - defaultQueueName = dest.getQueueName(); - } - else - { - defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); - } - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - defaultQueueName: binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchangeForBinding : - binding.getExchange(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + Strings.printMap(binding.getArgs())); - } - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); } void handleLinkDelete(AMQDestination dest) throws AMQException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index dbbc300910..e5ca82f56a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; @@ -48,10 +49,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; @@ -230,9 +235,7 @@ public class AMQSession_0_8 extends AMQSession( + new FailoverProtectedOperation() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, null, null); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + // valid if no issues, or just no bindings + return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3); + } + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) throws AMQException, FailoverException @@ -444,6 +473,7 @@ public class AMQSession_0_8 extends AMQSession arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + } + + (new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Void execute() throws AMQException, FailoverException + { + + // not setting alternate exchange + sendQueueDeclare(AMQShortString.valueOf(queueName), + link.isDurable(), + queueProps.isExclusive(), + queueProps.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + Map bindingArguments = new HashMap(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + bindQueue(AMQShortString.valueOf(queueName), + AMQShortString.valueOf(dest.getSubject()), + FieldTable.convertToFieldTable(bindingArguments), + AMQShortString.valueOf(dest.getAddressName()),dest,false); + + } + @Override public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException @@ -481,17 +568,52 @@ public class AMQSession_0_8 extends AMQSession arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + (new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()), + node.isDurable(), + node.isExclusive(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + // can't set alt. exchange + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(node.getDeclareArgs()), false); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + + protected void doBind(final AMQDestination dest, + final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()), + FieldTable.convertToFieldTable(binding.getArgs()), + AMQShortString.valueOf(exchange),dest); + } + + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException + { + Node node = dest.getNode(); + return isQueueExist(dest.getAddressName(), assertNode, + node.isDurable(), node.isAutoDelete(), + node.isExclusive(), node.getDeclareArgs()); + } + + public boolean isQueueExist(final String queueName, boolean assertNode, + final boolean durable, final boolean autoDelete, + final boolean exclusive, final Map args) throws AMQException { - throw new UnsupportedAddressSyntaxException(dest); + boolean match = isBound(null,AMQShortString.valueOf(queueName), null); + + if (assertNode) + { + if(!match) + { + throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." ); + + } + else + { + + new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(queueName), + durable, + exclusive, + autoDelete, + FieldTable.convertToFieldTable(args), + true); + + return null; + } + }, getAMQConnection()); + + } + } + + + return match; } + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException + { + boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName())); + + Node node = dest.getNode(); + + if (match) + { + if (assertNode) + { + + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(node.getDeclareArgs()), true); + + } + else + { + // TODO - some way to determine the exchange type + /* + _logger.debug("Setting Exchange type " + result.getType()); + node.setExchangeType(result.getType()); + dest.setExchangeClass(new AMQShortString(result.getType())); + */ + + } + } + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +". Exchange not found."); + } + } + + return match; + } protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index f735895c81..23d65e15d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client; +import javax.jms.JMSException; +import javax.jms.Message; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; -import javax.jms.JMSException; -import javax.jms.Message; - public class BasicMessageConsumer_0_8 extends BasicMessageConsumer { private final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer