summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-02-03 17:31:04 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-02-03 17:31:04 +0000
commitb3ef5ba7d29d82d7371553c56e77b9e38f986e57 (patch)
tree00eb2700c6ecea1cf2ca9db4f27104a378381757 /java/client
parent9c24857371fac993e3f3a22d10a426d5734f2368 (diff)
downloadqpid-python-b3ef5ba7d29d82d7371553c56e77b9e38f986e57.tar.gz
This is related to QPID-1831
I added the patch attached to the above JIRA with modifications. The modifications include integration with the address parser added by Rafi, and several refactoring and bug fixes to the original patch. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@906142 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java298
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java302
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java65
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java8
9 files changed, 632 insertions, 91 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index 4bb2c12cc8..a5c6f5f967 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.BindingURL;
@@ -38,6 +40,11 @@ public class AMQAnyDestination extends AMQDestination
{
super(binding);
}
+
+ public AMQAnyDestination(String str) throws URISyntaxException
+ {
+ super(str);
+ }
public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass,
AMQShortString routingKey,boolean isExclusive,
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d6f91daae0..653e049002 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -54,7 +54,6 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import org.apache.configuration.ClientProperties;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
@@ -63,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicQosBody;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0d1a89a6c0..38e5b4fee0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -29,10 +29,10 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
-import org.apache.configuration.ClientProperties;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 311ef1f486..1ed64e7890 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -21,6 +21,9 @@
package org.apache.qpid.client;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -28,26 +31,35 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
+import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
+import org.apache.qpid.client.messaging.address.QpidQueueOptions;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AMQDestination implements Destination, Referenceable
{
- protected final AMQShortString _exchangeName;
+ private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class);
+
+ protected AMQShortString _exchangeName;
- protected final AMQShortString _exchangeClass;
+ protected AMQShortString _exchangeClass;
- protected final boolean _isDurable;
+ protected boolean _isDurable;
- protected final boolean _isExclusive;
+ protected boolean _isExclusive;
- protected final boolean _isAutoDelete;
+ protected boolean _isAutoDelete;
- private final boolean _browseOnly;
+ private boolean _browseOnly;
private AMQShortString _queueName;
@@ -70,13 +82,107 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
- protected AMQDestination(String url) throws URISyntaxException
+
+ // ----- Fields required to support new address syntax -------
+
+ public enum DestSyntax {
+ BURL,ADDR;
+
+ public static DestSyntax getSyntaxType(String s)
+ {
+ if (("BURL").equals(s))
+ {
+ return BURL;
+ }
+ else if (("ADDR").equals(s))
+ {
+ return ADDR;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid Destination Syntax Type" +
+ " should be one of {BURL|ADDR}");
+ }
+ }
+ }
+
+ public enum AddressOption {
+ ALWAYS, NEVER, SENDER, RECEIVER;
+
+ public static AddressOption getOption(String str)
+ {
+ if ("always".equals(str)) return ALWAYS;
+ else if ("never".equals(str)) return NEVER;
+ else if ("sender".equals(str)) return SENDER;
+ else if ("receiver".equals(str)) return RECEIVER;
+ else throw new IllegalArgumentException(str + " is not an allowed value");
+ }
+ }
+
+ public enum FilterType { SQL92, XQUERY, SUBJECT }
+
+ protected static DestSyntax defaultDestSyntax;
+
+ protected DestSyntax _destSyntax;
+
+ protected Address _address;
+ protected String _name;
+ protected String _subject;
+ protected AddressOption _create = AddressOption.NEVER;
+ protected AddressOption _assert = AddressOption.ALWAYS;
+ protected AddressOption _delete = AddressOption.NEVER;
+
+ protected String _filter;
+ protected FilterType _filterType = FilterType.SUBJECT;
+ protected boolean _isNoLocal;
+ protected int _nodeType = QUEUE_TYPE;
+ protected String _alternateExchange;
+ protected QpidQueueOptions _queueOptions;
+ protected QpidExchangeOptions _exchangeOptions;
+ protected List<Binding> _bindings = new ArrayList<Binding>();
+ // ----- / Fields required to support new address syntax -------
+
+ static
+ {
+ defaultDestSyntax = DestSyntax.getSyntaxType(
+ System.getProperty(ClientProperties.DEST_SYNTAX,
+ DestSyntax.BURL.toString()));
+ }
+
+ protected AMQDestination(Address address)
+ {
+ this._address = address;
+ getInfoFromAddress();
+ _destSyntax = DestSyntax.ADDR;
+ _logger.info("Based on " + address + " the selected destination syntax is " + _destSyntax);
+ }
+
+ protected AMQDestination(String str) throws URISyntaxException
+ {
+ if (str.startsWith("BURL:") ||
+ (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ {
+ _destSyntax = DestSyntax.BURL;
+ getInfoFromBindingURL(new AMQBindingURL(str));
+ }
+ else
+ {
+ _destSyntax = DestSyntax.ADDR;
+ this._address = createAddressFromString(str);
+ getInfoFromAddress();
+ }
+ _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax);
+ }
+
+ //retained for legacy support
+ protected AMQDestination(BindingURL binding)
{
- this(new AMQBindingURL(url));
+ getInfoFromBindingURL(binding);
+ _destSyntax = DestSyntax.BURL;
+ _logger.info("Based on " + binding + " the selected destination syntax is " + _destSyntax);
}
- protected AMQDestination(BindingURL binding)
+ protected void getInfoFromBindingURL(BindingURL binding)
{
_exchangeName = binding.getExchangeName();
_exchangeClass = binding.getExchangeClass();
@@ -153,7 +259,9 @@ public abstract class AMQDestination implements Destination, Referenceable
_queueName = queueName;
_isDurable = isDurable;
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
+ _destSyntax = DestSyntax.BURL;
_browseOnly = browseOnly;
+ _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
}
public AMQShortString getEncodedName()
@@ -243,7 +351,14 @@ public abstract class AMQDestination implements Destination, Referenceable
public String toString()
{
- return toURL();
+ if (_destSyntax == DestSyntax.BURL)
+ {
+ return toURL();
+ }
+ else
+ {
+ return _address.toString();
+ }
}
@@ -424,8 +539,8 @@ public abstract class AMQDestination implements Destination, Referenceable
public int hashCode()
{
int result;
- result = _exchangeName.hashCode();
- result = 29 * result + _exchangeClass.hashCode();
+ result = _exchangeName == null ? "".hashCode() : _exchangeName.hashCode();
+ result = 29 * result + (_exchangeClass == null ? "".hashCode() :_exchangeClass.hashCode());
//result = 29 * result + _destinationName.hashCode();
if (_queueName != null)
{
@@ -513,6 +628,163 @@ public abstract class AMQDestination implements Destination, Referenceable
}
}
+ // ----- new address syntax -----------
+ public static class Binding
+ {
+ String exchange;
+ String bindingKey;
+ Map<String,Object> args;
+
+ public Binding(String exchange,String bindingKey,Map<String,Object> args)
+ {
+ this.exchange = exchange;
+ this.bindingKey = bindingKey;
+ this.args = args;
+ }
+
+ public String getExchange()
+ {
+ return exchange;
+ }
+
+ public String getBindingKey()
+ {
+ return bindingKey;
+ }
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+ }
+
+ public Address getAddress() {
+ return _address;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public String getSubject() {
+ return _subject;
+ }
+
+ public AddressOption getCreate() {
+ return _create;
+ }
+
+ public AddressOption getAssert() {
+ return _assert;
+ }
+
+ public AddressOption getDelete() {
+ return _delete;
+ }
+
+ public String getFilter() {
+ return _filter;
+ }
+
+ public FilterType getFilterType() {
+ return _filterType;
+ }
+
+ public boolean isNoLocal() {
+ return _isNoLocal;
+ }
+
+ public int getNodeType() {
+ return _nodeType;
+ }
+
+ public QpidQueueOptions getQueueOptions() {
+ return _queueOptions;
+ }
+
+ public List<Binding> getBindings() {
+ return _bindings;
+ }
+
+ public void addBinding(Binding binding) {
+ this._bindings.add(binding);
+ }
+
+ public DestSyntax getDestSyntax() {
+ return _destSyntax;
+ }
+
+ public QpidExchangeOptions getExchangeOptions() {
+ return _exchangeOptions;
+ }
+
+ public String getAlternateExchange() {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String alternateExchange) {
+ this._alternateExchange = alternateExchange;
+ }
+
+ public void setExchangeName(AMQShortString name)
+ {
+ this._exchangeName = name;
+ }
+
+ public void setExchangeClass(AMQShortString type)
+ {
+ this._exchangeClass = type;
+ }
+
+ public void setRoutingKey(AMQShortString rk)
+ {
+ this._routingKey = rk;
+ }
+
+ private Address createAddressFromString(String str)
+ {
+ if (str.startsWith("ADDR:"))
+ {
+ str = str.substring(str.indexOf(':')+1,str.length());
+ }
+ return Address.parse(str);
+ }
+
+ private void getInfoFromAddress()
+ {
+ _name = _address.getName();
+ _subject = _address.getSubject();
+
+ AddressHelper addrHelper = new AddressHelper(_address);
+
+ _create = addrHelper.getCreate() != null ?
+ AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER;
+
+ _assert = addrHelper.getAssert() != null ?
+ AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS;
+
+ _delete = addrHelper.getDelete() != null ?
+ AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER;
+
+ _filter = addrHelper.getFilter();
+ _isNoLocal = addrHelper.isNoLocal();
+ _isDurable = addrHelper.isDurable();
+ _isAutoDelete = addrHelper.isAutoDelete();
+ _isExclusive = addrHelper.isExclusive();
+ _browseOnly = addrHelper.isBrowseOnly();
+
+ _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")?
+ QUEUE_TYPE : TOPIC_TYPE;
+
+ _alternateExchange = addrHelper.getAltExchange();
+
+ _queueOptions = addrHelper.getQpidQueueOptions();
+ _exchangeOptions = addrHelper.getQpidExchangeOptions();
+ _bindings = addrHelper.getBindings();
+ }
+
+ // ----- / new address syntax -----------
+
public boolean isBrowseOnly()
{
return _browseOnly;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9f934d1055..be7af6b21f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -28,6 +28,8 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -203,7 +205,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
-
+
protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE;
/** System property to enable strict AMQP compliance. */
@@ -368,7 +370,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private boolean _dirty;
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
-
+
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
@@ -2095,7 +2097,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
- throw new JMSException("Cannot consume from a temporary destination created onanother session");
+ throw new JMSException("Cannot consume from a temporary destination created on another session");
}
if (tempDest.isDeleted())
@@ -2301,7 +2303,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
long producerId = getNextProducerId();
P producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+ immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
return producer;
@@ -2535,15 +2537,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQProtocolHandler protocolHandler = getProtocolHandler();
- if (DECLARE_EXCHANGES)
+ if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- declareExchange(amqd, protocolHandler, nowait);
+ handleAddressBasedDestination(amqd,true,nowait);
}
-
- if (DECLARE_QUEUES || amqd.isNameRequired())
+ else
{
- declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ if (DECLARE_EXCHANGES)
+ {
+ declareExchange(amqd, protocolHandler, nowait);
+ }
+
+ if (DECLARE_QUEUES || amqd.isNameRequired())
+ {
+ declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ }
}
+
AMQShortString queueName = amqd.getAMQQueueName();
// store the consumer queue name
@@ -2589,6 +2599,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ public abstract void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException;
+
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(new Long(producerId), producer);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 71d6066c01..018613800c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,33 +17,60 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import java.lang.ref.WeakReference;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.FiledTableSupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.util.Serial;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.QueueQueryResult;
import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,7 +150,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
private RangeSet unacked = new RangeSet();
- private int unackedCount = 0;
+ private int unackedCount = 0;
/**
* USed to store the range of in tx messages
@@ -305,18 +332,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final AMQDestination destination, final boolean nowait)
throws AMQException, FailoverException
{
- Map args = FiledTableSupport.convertToMap(arguments);
- // this is there only becasue the broker may expect a value for x-match
- if( ! args.containsKey("x-match") )
+ if (destination.getDestSyntax() == DestSyntax.BURL)
{
- args.put("x-match", "any");
+ Map args = FiledTableSupport.convertToMap(arguments);
+ // this is there only becasue the broker may expect a value for x-match
+ if( ! args.containsKey("x-match") )
+ {
+ args.put("x-match", "any");
+ }
+
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
+ " using binding key " + rk.asString());
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
+ args);
+ }
}
-
- for (AMQShortString rk: destination.getBindingKeys())
+ else
{
- _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+ for (Binding binding: destination.getBindings())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + binding.getExchange() +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + printMap(binding.getArgs()));
+ getQpidSession().exchangeBind(queueName.toString(),
+ binding.getExchange(),
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
}
+
if (!nowait)
{
// We need to sync so that we get notify of an error.
@@ -470,8 +520,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
throws JMSException
{
- String rk = null;
- boolean res;
+ String rk = null;
if (bindingKeys != null && bindingKeys.length>0)
{
rk = bindingKeys[0].toString();
@@ -480,18 +529,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
rk = routingKey.toString();
}
-
+ return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+ }
+
+ public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
+ throws JMSException
+ {
+ boolean res;
ExchangeBoundResult bindingQueryResult =
- getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get();
+ getQpidSession().exchangeBound(exchangeName,queueName, bindingKey, args).get();
- if (rk == null)
+ if (bindingKey == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
else
- {
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
- .getQueueNotMatched());
+ {
+ if (args == null)
+ {
+ res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ .getQueueNotMatched());
+ }
+ else
+ {
+ res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
+ }
}
return res;
}
@@ -566,15 +629,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* creates an exchange if it does not already exist
*/
- public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait)
+ public void sendExchangeDeclare(final AMQShortString name,
+ final AMQShortString type,
+ final AMQProtocolHandler protocolHandler, final boolean nowait)
throws AMQException, FailoverException
{
- getQpidSession().exchangeDeclare(name.toString(),
- type.toString(),
- null,
- null,
- name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
+ sendExchangeDeclare(name.asString(), type.asString(), null, null,
+ nowait);
+ }
+
+ public void sendExchangeDeclare(final String name, final String type,
+ final String alternateExchange, final Map<String, Object> args,
+ final boolean nowait) throws AMQException
+ {
+ getQpidSession().exchangeDeclare(
+ name,
+ type,
+ alternateExchange,
+ args,
+ name.toString().startsWith("amq.") ? Option.PASSIVE
+ : Option.NONE);
// We need to sync so that we get notify of an error.
if (!nowait)
{
@@ -598,28 +672,35 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal, final boolean nowait)
- throws AMQException, FailoverException
+ throws AMQException
{
- AMQShortString res;
+ AMQShortString queueName;
if (amqd.getAMQQueueName() == null)
{
// generate a name for this queue
- res = new AMQShortString("TempQueue" + UUID.randomUUID());
+ queueName = new AMQShortString("TempQueue" + UUID.randomUUID());
+ amqd.setQueueName(queueName);
}
else
{
- res = amqd.getAMQQueueName();
+ queueName = amqd.getAMQQueueName();
}
- Map<String,Object> arguments = null;
- if (noLocal)
- {
- arguments = new HashMap<String,Object>();
+
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ if (noLocal || amqd.isNoLocal())
+ {
arguments.put("no-local", true);
}
- getQpidSession().queueDeclare(res.toString(), null, arguments,
+
+ if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+ {
+ arguments.putAll(amqd.getQueueOptions());
+ }
+
+ getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
- !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
if (!nowait)
{
@@ -627,7 +708,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
getCurrentException();
}
- return res;
+ return queueName;
}
/**
@@ -934,5 +1015,136 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
+ {
+ boolean match = true;
+ ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get();
+ match = !result.getNotFound();
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == dest.isDurable()) &&
+ (dest.getExchangeClass().asString().equals(result.getType())) &&
+ (matchProps(result.getArguments(),dest.getQueueOptions()));
+ }
+ if (match)
+ {
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ }
+
+ return match;
+ }
+
+ public boolean isQueueExist(AMQDestination dest,boolean assertNode)
+ {
+ boolean match = true;
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get();
+ match = dest.getName().equals(result.getQueue());
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == dest.isDurable()) &&
+ (result.getAutoDelete() == dest.isAutoDelete()) &&
+ (result.getExclusive() == dest.isExclusive()) &&
+ (matchProps(result.getArguments(),dest.getQueueOptions()));
+ }
+
+ return match;
+ }
+
+ private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
+ {
+ boolean match = true;
+ for (String key: source.keySet())
+ {
+ match = target.containsKey(key) &&
+ target.get(key).equals(source.get(key));
+
+ if (!match) return match;
+ }
+
+ return match;
+ }
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ boolean noLocal = dest.isNoLocal();
+ boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
+ (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getAssert() == AddressOption.SENDER);
+
+
+ if (isExchangeExist(dest,assertNode))
+ {
+ dest.setExchangeName(new AMQShortString(dest.getName()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ if (isConsumer)
+ {
+ dest.setQueueName(null);
+ dest.addBinding(new Binding(dest.getName(),
+ dest.getSubject(),
+ null));
+ }
+ }
+ else if (isQueueExist(dest,assertNode))
+ {
+ dest.setQueueName(new AMQShortString(dest.getName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+ else if (dest.getCreate() == AddressOption.ALWAYS ||
+ dest.getCreate() == AddressOption.RECEIVER && isConsumer ||
+ dest.getCreate() == AddressOption.SENDER && !isConsumer)
+ {
+ if (dest.getNodeType() == AMQDestination.QUEUE_TYPE)
+ {
+ dest.setQueueName(new AMQShortString(dest.getName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+ else
+ {
+ dest.setQueueName(null);
+ dest.setExchangeName(new AMQShortString(dest.getName()));
+ dest.setExchangeClass(dest.getExchangeClass() == null?
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass());
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ dest.addBinding(new Binding(dest.getName(),
+ dest.getSubject(),
+ null));
+
+ sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(),
+ dest.getAlternateExchange(), dest.getExchangeOptions(),false);
+
+ }
+
+ send0_10QueueDeclare(dest,null,noLocal,noWait);
+ }
+ else
+ {
+ throw new AMQException("The name supplied in the address doesn't resolve to an exchange or a queue");
+ }
+ }
+
+ /** This should be moved to a suitable utility class */
+ private String printMap(Map<String,Object> map)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("<");
+ if (map != null)
+ {
+ for(String key : map.keySet())
+ {
+ sb.append(key).append(" = ").append(map.get(key)).append(" ");
+ }
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 9b84421612..edcdbebba9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -590,4 +590,11 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ throw new UnsupportedOperationException("The new addressing based sytanx is "
+ + "not supported for AMQP 0-8/0-9 versions");
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index a1b5ce6f4c..4cc419b0cf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -17,34 +17,39 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+
import java.nio.ByteBuffer;
+import java.util.UUID;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.DeliveryMode;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Option;
import org.apache.qpid.util.Strings;
-import org.apache.qpid.njms.ExceptionHelper;
-import org.apache.qpid.transport.*;
-import static org.apache.qpid.transport.Option.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a 0_10 message producer.
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
+ private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class);
private byte[] userIDBytes;
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -59,12 +64,27 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
void declareDestination(AMQDestination destination)
{
- String name = destination.getExchangeName().toString();
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
- (name,
- destination.getExchangeClass().toString(),
- null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ if (destination.getDestSyntax() == DestSyntax.BURL)
+ {
+ String name = destination.getExchangeName().toString();
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+ (name,
+ destination.getExchangeClass().toString(),
+ null, null,
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ }
+ else
+ {
+ try
+ {
+ getSession().handleAddressBasedDestination(destination,false,false);
+ }
+ catch(Exception e)
+ {
+ // Idealy this should be thrown to the JMS layer.
+ _logger.warn("Exception occured while verifying destination",e);
+ }
+ }
}
//--- Overwritten methods
@@ -136,7 +156,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
message.setJMSPriority(priority);
}
- String exchangeName = destination.getExchangeName().toString();
+ String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString();
if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
{
deliveryProp.setExchange(exchangeName);
@@ -166,7 +186,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
- ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+ ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(),
+ MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
new Header(deliveryProp, messageProps),
buffer, sync ? SYNC : NONE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 9b2a6693e1..dd8377a94a 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -175,4 +175,12 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void sync()
{
}
+
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ throw new UnsupportedOperationException("The new addressing based sytanx is "
+ + "not supported for AMQP 0-8/0-9 versions");
+ }
}