summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java298
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java302
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java65
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java8
9 files changed, 632 insertions, 91 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index 4bb2c12cc8..a5c6f5f967 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.BindingURL;
@@ -38,6 +40,11 @@ public class AMQAnyDestination extends AMQDestination
{
super(binding);
}
+
+ public AMQAnyDestination(String str) throws URISyntaxException
+ {
+ super(str);
+ }
public AMQAnyDestination(AMQShortString exchangeName,AMQShortString exchangeClass,
AMQShortString routingKey,boolean isExclusive,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index d6f91daae0..653e049002 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -54,7 +54,6 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import org.apache.configuration.ClientProperties;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
@@ -63,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicQosBody;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0d1a89a6c0..38e5b4fee0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -29,10 +29,10 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
-import org.apache.configuration.ClientProperties;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 311ef1f486..1ed64e7890 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -21,6 +21,9 @@
package org.apache.qpid.client;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -28,26 +31,35 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
+import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.QpidExchangeOptions;
+import org.apache.qpid.client.messaging.address.QpidQueueOptions;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AMQDestination implements Destination, Referenceable
{
- protected final AMQShortString _exchangeName;
+ private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class);
+
+ protected AMQShortString _exchangeName;
- protected final AMQShortString _exchangeClass;
+ protected AMQShortString _exchangeClass;
- protected final boolean _isDurable;
+ protected boolean _isDurable;
- protected final boolean _isExclusive;
+ protected boolean _isExclusive;
- protected final boolean _isAutoDelete;
+ protected boolean _isAutoDelete;
- private final boolean _browseOnly;
+ private boolean _browseOnly;
private AMQShortString _queueName;
@@ -70,13 +82,107 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
- protected AMQDestination(String url) throws URISyntaxException
+
+ // ----- Fields required to support new address syntax -------
+
+ public enum DestSyntax {
+ BURL,ADDR;
+
+ public static DestSyntax getSyntaxType(String s)
+ {
+ if (("BURL").equals(s))
+ {
+ return BURL;
+ }
+ else if (("ADDR").equals(s))
+ {
+ return ADDR;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid Destination Syntax Type" +
+ " should be one of {BURL|ADDR}");
+ }
+ }
+ }
+
+ public enum AddressOption {
+ ALWAYS, NEVER, SENDER, RECEIVER;
+
+ public static AddressOption getOption(String str)
+ {
+ if ("always".equals(str)) return ALWAYS;
+ else if ("never".equals(str)) return NEVER;
+ else if ("sender".equals(str)) return SENDER;
+ else if ("receiver".equals(str)) return RECEIVER;
+ else throw new IllegalArgumentException(str + " is not an allowed value");
+ }
+ }
+
+ public enum FilterType { SQL92, XQUERY, SUBJECT }
+
+ protected static DestSyntax defaultDestSyntax;
+
+ protected DestSyntax _destSyntax;
+
+ protected Address _address;
+ protected String _name;
+ protected String _subject;
+ protected AddressOption _create = AddressOption.NEVER;
+ protected AddressOption _assert = AddressOption.ALWAYS;
+ protected AddressOption _delete = AddressOption.NEVER;
+
+ protected String _filter;
+ protected FilterType _filterType = FilterType.SUBJECT;
+ protected boolean _isNoLocal;
+ protected int _nodeType = QUEUE_TYPE;
+ protected String _alternateExchange;
+ protected QpidQueueOptions _queueOptions;
+ protected QpidExchangeOptions _exchangeOptions;
+ protected List<Binding> _bindings = new ArrayList<Binding>();
+ // ----- / Fields required to support new address syntax -------
+
+ static
+ {
+ defaultDestSyntax = DestSyntax.getSyntaxType(
+ System.getProperty(ClientProperties.DEST_SYNTAX,
+ DestSyntax.BURL.toString()));
+ }
+
+ protected AMQDestination(Address address)
+ {
+ this._address = address;
+ getInfoFromAddress();
+ _destSyntax = DestSyntax.ADDR;
+ _logger.info("Based on " + address + " the selected destination syntax is " + _destSyntax);
+ }
+
+ protected AMQDestination(String str) throws URISyntaxException
+ {
+ if (str.startsWith("BURL:") ||
+ (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ {
+ _destSyntax = DestSyntax.BURL;
+ getInfoFromBindingURL(new AMQBindingURL(str));
+ }
+ else
+ {
+ _destSyntax = DestSyntax.ADDR;
+ this._address = createAddressFromString(str);
+ getInfoFromAddress();
+ }
+ _logger.info("Based on " + str + " the selected destination syntax is " + _destSyntax);
+ }
+
+ //retained for legacy support
+ protected AMQDestination(BindingURL binding)
{
- this(new AMQBindingURL(url));
+ getInfoFromBindingURL(binding);
+ _destSyntax = DestSyntax.BURL;
+ _logger.info("Based on " + binding + " the selected destination syntax is " + _destSyntax);
}
- protected AMQDestination(BindingURL binding)
+ protected void getInfoFromBindingURL(BindingURL binding)
{
_exchangeName = binding.getExchangeName();
_exchangeClass = binding.getExchangeClass();
@@ -153,7 +259,9 @@ public abstract class AMQDestination implements Destination, Referenceable
_queueName = queueName;
_isDurable = isDurable;
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
+ _destSyntax = DestSyntax.BURL;
_browseOnly = browseOnly;
+ _logger.info("Based on " + toString() + " the selected destination syntax is " + _destSyntax);
}
public AMQShortString getEncodedName()
@@ -243,7 +351,14 @@ public abstract class AMQDestination implements Destination, Referenceable
public String toString()
{
- return toURL();
+ if (_destSyntax == DestSyntax.BURL)
+ {
+ return toURL();
+ }
+ else
+ {
+ return _address.toString();
+ }
}
@@ -424,8 +539,8 @@ public abstract class AMQDestination implements Destination, Referenceable
public int hashCode()
{
int result;
- result = _exchangeName.hashCode();
- result = 29 * result + _exchangeClass.hashCode();
+ result = _exchangeName == null ? "".hashCode() : _exchangeName.hashCode();
+ result = 29 * result + (_exchangeClass == null ? "".hashCode() :_exchangeClass.hashCode());
//result = 29 * result + _destinationName.hashCode();
if (_queueName != null)
{
@@ -513,6 +628,163 @@ public abstract class AMQDestination implements Destination, Referenceable
}
}
+ // ----- new address syntax -----------
+ public static class Binding
+ {
+ String exchange;
+ String bindingKey;
+ Map<String,Object> args;
+
+ public Binding(String exchange,String bindingKey,Map<String,Object> args)
+ {
+ this.exchange = exchange;
+ this.bindingKey = bindingKey;
+ this.args = args;
+ }
+
+ public String getExchange()
+ {
+ return exchange;
+ }
+
+ public String getBindingKey()
+ {
+ return bindingKey;
+ }
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+ }
+
+ public Address getAddress() {
+ return _address;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public String getSubject() {
+ return _subject;
+ }
+
+ public AddressOption getCreate() {
+ return _create;
+ }
+
+ public AddressOption getAssert() {
+ return _assert;
+ }
+
+ public AddressOption getDelete() {
+ return _delete;
+ }
+
+ public String getFilter() {
+ return _filter;
+ }
+
+ public FilterType getFilterType() {
+ return _filterType;
+ }
+
+ public boolean isNoLocal() {
+ return _isNoLocal;
+ }
+
+ public int getNodeType() {
+ return _nodeType;
+ }
+
+ public QpidQueueOptions getQueueOptions() {
+ return _queueOptions;
+ }
+
+ public List<Binding> getBindings() {
+ return _bindings;
+ }
+
+ public void addBinding(Binding binding) {
+ this._bindings.add(binding);
+ }
+
+ public DestSyntax getDestSyntax() {
+ return _destSyntax;
+ }
+
+ public QpidExchangeOptions getExchangeOptions() {
+ return _exchangeOptions;
+ }
+
+ public String getAlternateExchange() {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(String alternateExchange) {
+ this._alternateExchange = alternateExchange;
+ }
+
+ public void setExchangeName(AMQShortString name)
+ {
+ this._exchangeName = name;
+ }
+
+ public void setExchangeClass(AMQShortString type)
+ {
+ this._exchangeClass = type;
+ }
+
+ public void setRoutingKey(AMQShortString rk)
+ {
+ this._routingKey = rk;
+ }
+
+ private Address createAddressFromString(String str)
+ {
+ if (str.startsWith("ADDR:"))
+ {
+ str = str.substring(str.indexOf(':')+1,str.length());
+ }
+ return Address.parse(str);
+ }
+
+ private void getInfoFromAddress()
+ {
+ _name = _address.getName();
+ _subject = _address.getSubject();
+
+ AddressHelper addrHelper = new AddressHelper(_address);
+
+ _create = addrHelper.getCreate() != null ?
+ AddressOption.getOption(addrHelper.getCreate()):AddressOption.NEVER;
+
+ _assert = addrHelper.getAssert() != null ?
+ AddressOption.getOption(addrHelper.getAssert()):AddressOption.ALWAYS;
+
+ _delete = addrHelper.getDelete() != null ?
+ AddressOption.getOption(addrHelper.getDelete()):AddressOption.NEVER;
+
+ _filter = addrHelper.getFilter();
+ _isNoLocal = addrHelper.isNoLocal();
+ _isDurable = addrHelper.isDurable();
+ _isAutoDelete = addrHelper.isAutoDelete();
+ _isExclusive = addrHelper.isExclusive();
+ _browseOnly = addrHelper.isBrowseOnly();
+
+ _nodeType = addrHelper.getNodeType() == null || addrHelper.getNodeType().equals("queue")?
+ QUEUE_TYPE : TOPIC_TYPE;
+
+ _alternateExchange = addrHelper.getAltExchange();
+
+ _queueOptions = addrHelper.getQpidQueueOptions();
+ _exchangeOptions = addrHelper.getQpidExchangeOptions();
+ _bindings = addrHelper.getBindings();
+ }
+
+ // ----- / new address syntax -----------
+
public boolean isBrowseOnly()
{
return _browseOnly;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9f934d1055..be7af6b21f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -28,6 +28,8 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -203,7 +205,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
-
+
protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE;
/** System property to enable strict AMQP compliance. */
@@ -368,7 +370,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private boolean _dirty;
/** Has failover occured on this session with outstanding actions to commit? */
private boolean _failedOverDirty;
-
+
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
@@ -2095,7 +2097,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
- throw new JMSException("Cannot consume from a temporary destination created onanother session");
+ throw new JMSException("Cannot consume from a temporary destination created on another session");
}
if (tempDest.isDeleted())
@@ -2301,7 +2303,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
long producerId = getNextProducerId();
P producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+ immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
return producer;
@@ -2535,15 +2537,23 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQProtocolHandler protocolHandler = getProtocolHandler();
- if (DECLARE_EXCHANGES)
+ if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
- declareExchange(amqd, protocolHandler, nowait);
+ handleAddressBasedDestination(amqd,true,nowait);
}
-
- if (DECLARE_QUEUES || amqd.isNameRequired())
+ else
{
- declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ if (DECLARE_EXCHANGES)
+ {
+ declareExchange(amqd, protocolHandler, nowait);
+ }
+
+ if (DECLARE_QUEUES || amqd.isNameRequired())
+ {
+ declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ }
}
+
AMQShortString queueName = amqd.getAMQQueueName();
// store the consumer queue name
@@ -2589,6 +2599,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ public abstract void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException;
+
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(new Long(producerId), producer);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 71d6066c01..018613800c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,33 +17,60 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
+import static org.apache.qpid.transport.Option.BATCH;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import java.lang.ref.WeakReference;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.FiledTableSupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.util.Serial;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ExchangeBoundResult;
+import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.QueueQueryResult;
import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.util.Serial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,7 +150,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
private RangeSet unacked = new RangeSet();
- private int unackedCount = 0;
+ private int unackedCount = 0;
/**
* USed to store the range of in tx messages
@@ -305,18 +332,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
final AMQDestination destination, final boolean nowait)
throws AMQException, FailoverException
{
- Map args = FiledTableSupport.convertToMap(arguments);
- // this is there only becasue the broker may expect a value for x-match
- if( ! args.containsKey("x-match") )
+ if (destination.getDestSyntax() == DestSyntax.BURL)
{
- args.put("x-match", "any");
+ Map args = FiledTableSupport.convertToMap(arguments);
+ // this is there only becasue the broker may expect a value for x-match
+ if( ! args.containsKey("x-match") )
+ {
+ args.put("x-match", "any");
+ }
+
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + exchangeName.toString() +
+ " using binding key " + rk.asString());
+ getQpidSession().exchangeBind(queueName.toString(),
+ exchangeName.toString(),
+ rk.toString(),
+ args);
+ }
}
-
- for (AMQShortString rk: destination.getBindingKeys())
+ else
{
- _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+ for (Binding binding: destination.getBindings())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() +
+ " exchange: " + binding.getExchange() +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + printMap(binding.getArgs()));
+ getQpidSession().exchangeBind(queueName.toString(),
+ binding.getExchange(),
+ binding.getBindingKey(),
+ binding.getArgs());
+ }
}
+
if (!nowait)
{
// We need to sync so that we get notify of an error.
@@ -470,8 +520,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
throws JMSException
{
- String rk = null;
- boolean res;
+ String rk = null;
if (bindingKeys != null && bindingKeys.length>0)
{
rk = bindingKeys[0].toString();
@@ -480,18 +529,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
rk = routingKey.toString();
}
-
+ return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+ }
+
+ public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
+ throws JMSException
+ {
+ boolean res;
ExchangeBoundResult bindingQueryResult =
- getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get();
+ getQpidSession().exchangeBound(exchangeName,queueName, bindingKey, args).get();
- if (rk == null)
+ if (bindingKey == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
else
- {
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
- .getQueueNotMatched());
+ {
+ if (args == null)
+ {
+ res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ .getQueueNotMatched());
+ }
+ else
+ {
+ res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
+ .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched());
+ }
}
return res;
}
@@ -566,15 +629,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* creates an exchange if it does not already exist
*/
- public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait)
+ public void sendExchangeDeclare(final AMQShortString name,
+ final AMQShortString type,
+ final AMQProtocolHandler protocolHandler, final boolean nowait)
throws AMQException, FailoverException
{
- getQpidSession().exchangeDeclare(name.toString(),
- type.toString(),
- null,
- null,
- name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
+ sendExchangeDeclare(name.asString(), type.asString(), null, null,
+ nowait);
+ }
+
+ public void sendExchangeDeclare(final String name, final String type,
+ final String alternateExchange, final Map<String, Object> args,
+ final boolean nowait) throws AMQException
+ {
+ getQpidSession().exchangeDeclare(
+ name,
+ type,
+ alternateExchange,
+ args,
+ name.toString().startsWith("amq.") ? Option.PASSIVE
+ : Option.NONE);
// We need to sync so that we get notify of an error.
if (!nowait)
{
@@ -598,28 +672,35 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean noLocal, final boolean nowait)
- throws AMQException, FailoverException
+ throws AMQException
{
- AMQShortString res;
+ AMQShortString queueName;
if (amqd.getAMQQueueName() == null)
{
// generate a name for this queue
- res = new AMQShortString("TempQueue" + UUID.randomUUID());
+ queueName = new AMQShortString("TempQueue" + UUID.randomUUID());
+ amqd.setQueueName(queueName);
}
else
{
- res = amqd.getAMQQueueName();
+ queueName = amqd.getAMQQueueName();
}
- Map<String,Object> arguments = null;
- if (noLocal)
- {
- arguments = new HashMap<String,Object>();
+
+ Map<String,Object> arguments = new HashMap<String,Object>();
+ if (noLocal || amqd.isNoLocal())
+ {
arguments.put("no-local", true);
}
- getQpidSession().queueDeclare(res.toString(), null, arguments,
+
+ if (amqd.getDestSyntax() == DestSyntax.ADDR && amqd.getQueueOptions() != null)
+ {
+ arguments.putAll(amqd.getQueueOptions());
+ }
+
+ getQpidSession().queueDeclare(queueName.toString(), amqd.getAlternateExchange() , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
- !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
if (!nowait)
{
@@ -627,7 +708,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().sync();
getCurrentException();
}
- return res;
+ return queueName;
}
/**
@@ -934,5 +1015,136 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
+ {
+ boolean match = true;
+ ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getName(), Option.NONE).get();
+ match = !result.getNotFound();
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == dest.isDurable()) &&
+ (dest.getExchangeClass().asString().equals(result.getType())) &&
+ (matchProps(result.getArguments(),dest.getQueueOptions()));
+ }
+ if (match)
+ {
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ }
+
+ return match;
+ }
+
+ public boolean isQueueExist(AMQDestination dest,boolean assertNode)
+ {
+ boolean match = true;
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getName(), Option.NONE).get();
+ match = dest.getName().equals(result.getQueue());
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == dest.isDurable()) &&
+ (result.getAutoDelete() == dest.isAutoDelete()) &&
+ (result.getExclusive() == dest.isExclusive()) &&
+ (matchProps(result.getArguments(),dest.getQueueOptions()));
+ }
+
+ return match;
+ }
+
+ private boolean matchProps(Map<String,Object> target,Map<String,Object> source)
+ {
+ boolean match = true;
+ for (String key: source.keySet())
+ {
+ match = target.containsKey(key) &&
+ target.get(key).equals(source.get(key));
+
+ if (!match) return match;
+ }
+
+ return match;
+ }
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ boolean noLocal = dest.isNoLocal();
+ boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
+ (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getAssert() == AddressOption.SENDER);
+
+
+ if (isExchangeExist(dest,assertNode))
+ {
+ dest.setExchangeName(new AMQShortString(dest.getName()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ if (isConsumer)
+ {
+ dest.setQueueName(null);
+ dest.addBinding(new Binding(dest.getName(),
+ dest.getSubject(),
+ null));
+ }
+ }
+ else if (isQueueExist(dest,assertNode))
+ {
+ dest.setQueueName(new AMQShortString(dest.getName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+ else if (dest.getCreate() == AddressOption.ALWAYS ||
+ dest.getCreate() == AddressOption.RECEIVER && isConsumer ||
+ dest.getCreate() == AddressOption.SENDER && !isConsumer)
+ {
+ if (dest.getNodeType() == AMQDestination.QUEUE_TYPE)
+ {
+ dest.setQueueName(new AMQShortString(dest.getName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+ else
+ {
+ dest.setQueueName(null);
+ dest.setExchangeName(new AMQShortString(dest.getName()));
+ dest.setExchangeClass(dest.getExchangeClass() == null?
+ ExchangeDefaults.TOPIC_EXCHANGE_CLASS:dest.getExchangeClass());
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ dest.addBinding(new Binding(dest.getName(),
+ dest.getSubject(),
+ null));
+
+ sendExchangeDeclare(dest.getName(), dest.getExchangeClass().asString(),
+ dest.getAlternateExchange(), dest.getExchangeOptions(),false);
+
+ }
+
+ send0_10QueueDeclare(dest,null,noLocal,noWait);
+ }
+ else
+ {
+ throw new AMQException("The name supplied in the address doesn't resolve to an exchange or a queue");
+ }
+ }
+
+ /** This should be moved to a suitable utility class */
+ private String printMap(Map<String,Object> map)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("<");
+ if (map != null)
+ {
+ for(String key : map.keySet())
+ {
+ sb.append(key).append(" = ").append(map.get(key)).append(" ");
+ }
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 9b84421612..edcdbebba9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -590,4 +590,11 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ throw new UnsupportedOperationException("The new addressing based sytanx is "
+ + "not supported for AMQP 0-8/0-9 versions");
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index a1b5ce6f4c..4cc419b0cf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -17,34 +17,39 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+
import java.nio.ByteBuffer;
+import java.util.UUID;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.DeliveryMode;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.AMQDestination.AddressOption;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Option;
import org.apache.qpid.util.Strings;
-import org.apache.qpid.njms.ExceptionHelper;
-import org.apache.qpid.transport.*;
-import static org.apache.qpid.transport.Option.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a 0_10 message producer.
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
+ private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_10.class);
private byte[] userIDBytes;
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -59,12 +64,27 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
void declareDestination(AMQDestination destination)
{
- String name = destination.getExchangeName().toString();
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
- (name,
- destination.getExchangeClass().toString(),
- null, null,
- name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ if (destination.getDestSyntax() == DestSyntax.BURL)
+ {
+ String name = destination.getExchangeName().toString();
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare
+ (name,
+ destination.getExchangeClass().toString(),
+ null, null,
+ name.startsWith("amq.") ? Option.PASSIVE : Option.NONE);
+ }
+ else
+ {
+ try
+ {
+ getSession().handleAddressBasedDestination(destination,false,false);
+ }
+ catch(Exception e)
+ {
+ // Idealy this should be thrown to the JMS layer.
+ _logger.warn("Exception occured while verifying destination",e);
+ }
+ }
}
//--- Overwritten methods
@@ -136,7 +156,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
message.setJMSPriority(priority);
}
- String exchangeName = destination.getExchangeName().toString();
+ String exchangeName = destination.getExchangeName() == null ? "" : destination.getExchangeName().toString();
if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
{
deliveryProp.setExchange(exchangeName);
@@ -166,7 +186,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
org.apache.mina.common.ByteBuffer data = message.getData();
ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
- ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+ ssn.messageTransfer(destination.getExchangeName() == null ? "" : destination.getExchangeName().toString(),
+ MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED,
new Header(deliveryProp, messageProps),
buffer, sync ? SYNC : NONE);
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 9b2a6693e1..dd8377a94a 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -175,4 +175,12 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void sync()
{
}
+
+ public void handleAddressBasedDestination(AMQDestination dest,
+ boolean isConsumer,
+ boolean noWait) throws AMQException
+ {
+ throw new UnsupportedOperationException("The new addressing based sytanx is "
+ + "not supported for AMQP 0-8/0-9 versions");
+ }
}