summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-23 15:31:04 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-23 15:31:04 +0000
commit672f8e531132cb7780d1d09b6eb3a3d5e6ba397e (patch)
treea446d3910989b4254517dcdb8dba18e950f96d12 /qpid/java/client/src/main
parent1044edb5417c80edae3f211ec38dae4a58c4a647 (diff)
downloadqpid-python-672f8e531132cb7780d1d09b6eb3a3d5e6ba397e.tar.gz
QPID-6037 : [Java Client] Add experimental support for ADDR addressing to the 0-8/9/9-1 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620036 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src/main')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java223
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java198
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java340
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java40
8 files changed, 653 insertions, 248 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 5242629a91..9650ad76fb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
private boolean _messageCompressionSupported;
+ private boolean _addrSyntaxSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public AMQConnectionDelegate_8_0(AMQConnection conn)
{
_conn = conn;
+ _addrSyntaxSupported =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8,
+ String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT)));
}
protected boolean checkException(Throwable thrown)
@@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return _messageCompressionSupported;
}
+
+ public boolean isAddrSyntaxSupported()
+ {
+ return _addrSyntaxSupported;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index e06fc0f1de..2714caf2a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -20,6 +20,20 @@
*/
package org.apache.qpid.client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Destination;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
-import javax.jms.Destination;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
public abstract class AMQDestination implements Destination, Referenceable, Externalizable
{
@@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
_address = addr;
}
- public int getAddressType(){
+ public int getAddressType()
+ {
return _addressType;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c2659194e2..0b299a22cd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ListMessage;
@@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
/*
* TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For
@@ -600,6 +603,128 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ public void setLegacyFieldsForQueueType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(new AMQShortString(dest.getAddressName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+
+ public void setLegacyFieldsForTopicType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setExchangeName(new AMQShortString(dest.getAddressName()));
+ Node node = dest.getNode();
+ dest.setExchangeClass(node.getExchangeType() == null?
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
+ new AMQShortString(node.getExchangeType()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ }
+
+ protected void verifySubject(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
+ {
+
+ if ("topic".equals(dest.getExchangeClass().toString()))
+ {
+ dest.setRoutingKey(new AMQShortString("#"));
+ dest.setSubject(dest.getRoutingKey().toString());
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(""));
+ dest.setSubject("");
+ }
+ }
+ }
+
+ public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException;
+
+ public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException;
+
+ /**
+ * 1. Try to resolve the address type (queue or exchange)
+ * 2. if type == queue,
+ * 2.1 verify queue exists or create if create == true
+ * 2.2 If not throw exception
+ *
+ * 3. if type == exchange,
+ * 3.1 verify exchange exists or create if create == true
+ * 3.2 if not throw exception
+ * 3.3 if exchange exists (or created) create subscription queue.
+ */
+
+ @SuppressWarnings("deprecation")
+ public void resolveAddress(AMQDestination dest,
+ boolean isConsumer,
+ boolean noLocal) throws AMQException
+ {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
+ {
+ return;
+ }
+ else
+ {
+ boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) ||
+ (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER);
+
+ boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) ||
+ (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER);
+
+
+
+ int type = resolveAddressType(dest);
+
+ switch (type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ {
+ if(createNode)
+ {
+ setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
+ break;
+ }
+ else if (isQueueExist(dest,assertNode))
+ {
+ setLegacyFieldsForQueueType(dest);
+ break;
+ }
+ }
+
+ case AMQDestination.TOPIC_TYPE:
+ {
+ if(createNode)
+ {
+ setLegacyFieldsForTopicType(dest);
+ verifySubject(dest);
+ handleExchangeNodeCreation(dest);
+ break;
+ }
+ else if (isExchangeExist(dest,assertNode))
+ {
+ setLegacyFieldsForTopicType(dest);
+ verifySubject(dest);
+ break;
+ }
+ }
+
+ default:
+ throw new AMQException(
+ "The name '" + dest.getAddressName() +
+ "' supplied in the address doesn't resolve to an exchange or a queue");
+ }
+ dest.setAddressResolved(System.currentTimeMillis());
+ }
+ }
+
+ public abstract int resolveAddressType(AMQDestination dest) throws AMQException;
+
protected abstract void acknowledgeImpl() throws JMSException;
/**
@@ -2594,6 +2719,54 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ void handleLinkCreation(AMQDestination dest) throws AMQException
+ {
+ createBindings(dest, dest.getLink().getBindings());
+ }
+
+
+ void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException
+ {
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doBind(dest, binding, queue, exchange);
+ }
+ }
+
+ protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException;
+
+ abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException;
+
+ abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange)
+ throws AMQException;
+
public abstract void sendConsume(C consumer, AMQShortString queueName,
boolean nowait, int tag) throws AMQException, FailoverException;
@@ -2696,7 +2869,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @throws AMQException If the exchange cannot be declared for any reason.
* TODO Be aware of possible changes to parameter order as versions change.
*/
- private void declareExchange(final AMQShortString name, final AMQShortString type,
+ void declareExchange(final AMQShortString name, final AMQShortString type,
final boolean nowait, final boolean durable,
final boolean autoDelete, final boolean internal) throws AMQException
{
@@ -2710,9 +2883,53 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}, _connection).execute();
}
+ void declareExchange(final AMQShortString name, final AMQShortString type,
+ final boolean nowait, final boolean durable,
+ final boolean autoDelete, final FieldTable arguments,
+ final boolean passive) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive);
+ return null;
+ }
+ }, _connection).execute();
+ }
+
+ protected AMQShortString preprocessAddressTopic(final C consumer,
+ AMQShortString queueName) throws AMQException
+ {
+ if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+ {
+ if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+ {
+ String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
+ queueName = consumer.getDestination().getAMQQueueName();
+ consumer.setQueuename(queueName);
+ }
+ handleLinkCreation(consumer.getDestination());
+ }
+ return queueName;
+ }
+
+ abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException;
+
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
+
+ public abstract void sendExchangeDeclare(final AMQShortString name,
+ final AMQShortString type,
+ final boolean nowait,
+ boolean durable,
+ boolean autoDelete,
+ FieldTable arguments,
+ final boolean passive) throws AMQException, FailoverException;
+
/**
* Declares a queue for a JMS destination.
* <p>
@@ -2930,10 +3147,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
throws AMQException;
- public abstract void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException;
-
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(producerId, producer);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 19720ea386..46f999e452 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
@@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
+ doBind(destination, binding, queue, exchange);
}
}
@@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean nowait, int tag)
throws AMQException, FailoverException
{
- if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
- {
- if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
- {
- String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
-
- createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
- queueName = consumer.getDestination().getAMQQueueName();
- consumer.setQueuename(queueName);
- }
- handleLinkCreation(consumer.getDestination());
- }
+ queueName = preprocessAddressTopic(consumer, queueName);
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name.asString(), type.asString(), null,
+ arguments == null ? null : FieldTableSupport.convertToMap(arguments),
+ nowait, durable, autoDelete);
+ }
+
+
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
@@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+ @Override
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
@@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
+ @Override
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
Node node = dest.getNode();
@@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- /**
- * 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
- * 2.1 verify queue exists or create if create == true
- * 2.2 If not throw exception
- *
- * 3. if type == exchange,
- * 3.1 verify exchange exists or create if create == true
- * 3.2 if not throw exception
- * 3.3 if exchange exists (or created) create subscription queue.
- */
-
- @SuppressWarnings("deprecation")
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
- {
- if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
- {
- return;
- }
- else
- {
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
- boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
- int type = resolveAddressType(dest);
-
- switch (type)
- {
- case AMQDestination.QUEUE_TYPE:
- {
- if(createNode)
- {
- setLegacyFieldsForQueueType(dest);
- handleQueueNodeCreation(dest,noLocal);
- break;
- }
- else if (isQueueExist(dest,assertNode))
- {
- setLegacyFieldsForQueueType(dest);
- break;
- }
- }
-
- case AMQDestination.TOPIC_TYPE:
- {
- if(createNode)
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- handleExchangeNodeCreation(dest);
- break;
- }
- else if (isExchangeExist(dest,assertNode))
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- break;
- }
- }
-
- default:
- throw new AMQException(
- "The name '" + dest.getAddressName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
- }
- dest.setAddressResolved(System.currentTimeMillis());
- }
- }
-
+ @Override
public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
@@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void verifySubject(AMQDestination dest) throws AMQException
- {
- if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
- {
-
- if ("topic".equals(dest.getExchangeClass().toString()))
- {
- dest.setRoutingKey(new AMQShortString("#"));
- dest.setSubject(dest.getRoutingKey().toString());
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(""));
- dest.setSubject("");
- }
- }
- }
-
+ @Override
void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
Link link = dest.getLink();
@@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
bindingArguments);
}
- public void setLegacyFieldsForQueueType(AMQDestination dest)
- {
- // legacy support
- dest.setQueueName(new AMQShortString(dest.getAddressName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
-
- public void setLegacyFiledsForTopicType(AMQDestination dest)
- {
- // legacy support
- dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- Node node = dest.getNode();
- dest.setExchangeClass(node.getExchangeType() == null?
- AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
- new AMQShortString(node.getExchangeType()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- }
-
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ @Override
+ protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
{
Node node = dest.getNode();
Map<String,Object> arguments = node.getDeclareArgs();
@@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
+ @Override
void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
{
Node node = dest.getNode();
@@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
- void handleLinkCreation(AMQDestination dest) throws AMQException
- {
- createBindings(dest, dest.getLink().getBindings());
- }
-
- void createBindings(AMQDestination dest, List<Binding> bindings)
+ protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange)
{
- String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
- .getAddressName() : "amq.topic";
-
- String defaultQueueName = null;
- if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
- {
- defaultQueueName = dest.getQueueName();
- }
- else
- {
- defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
- }
-
- for (Binding binding: bindings)
- {
- String queue = binding.getQueue() == null?
- defaultQueueName: binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
- defaultExchangeForBinding :
- binding.getExchange();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
- " with args " + Strings.printMap(binding.getArgs()));
- }
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
- }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
}
void handleLinkDelete(AMQDestination dest) throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index dbbc300910..e5ca82f56a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -48,10 +49,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
@@ -230,9 +235,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
table.setObject(entry.getKey(), entry.getValue());
}
}
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
- AMQFrame queueDeclare = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ sendQueueDeclare(name, durable, exclusive, autoDelete, table, false);
}
public void sendRecover() throws AMQException, FailoverException
@@ -428,6 +431,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return (responseBody.getReplyCode() == 0);
}
+
+ protected boolean exchangeExists(final AMQShortString exchangeName)
+ throws AMQException
+ {
+ if(!getAMQConnection().getDelegate().supportsIsBound())
+ {
+ return false;
+ }
+
+ AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ return sendExchangeBound(exchangeName, null, null);
+
+ }
+ }, getAMQConnection()).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ // valid if no issues, or just no bindings
+ return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3);
+ }
+
private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName,
AMQShortString routingKey,
AMQShortString queueName) throws AMQException, FailoverException
@@ -444,6 +473,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
boolean nowait,
int tag) throws AMQException, FailoverException
{
+ queueName = preprocessAddressTopic(consumer, queueName);
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -468,6 +498,63 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
@Override
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
+ {
+ final Link link = dest.getLink();
+ final String queueName ;
+
+ if (dest.getQueueName() == null)
+ {
+ queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+ dest.setQueueName(new AMQShortString(queueName));
+ }
+ else
+ {
+ queueName = dest.getQueueName();
+ }
+
+ final Link.SubscriptionQueue queueProps = link.getSubscriptionQueue();
+ final Map<String,Object> arguments = queueProps.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ if (link.isDurable() && queueName.startsWith("TempQueue"))
+ {
+ throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
+ }
+
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ // not setting alternate exchange
+ sendQueueDeclare(AMQShortString.valueOf(queueName),
+ link.isDurable(),
+ queueProps.isExclusive(),
+ queueProps.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments),
+ false);
+
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+ bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+
+ bindQueue(AMQShortString.valueOf(queueName),
+ AMQShortString.valueOf(dest.getSubject()),
+ FieldTable.convertToFieldTable(bindingArguments),
+ AMQShortString.valueOf(dest.getAddressName()),dest,false);
+
+ }
+
+ @Override
public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
@@ -481,17 +568,52 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+ @Override
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(),
+ name,
+ type,
+ passive || name.toString().startsWith("amq."),
+ durable,
+ autoDelete,
+ false,
+ false,
+ arguments);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
+ AMQShortString queueName = amqd.getAMQQueueName();
+ boolean durable = amqd.isDurable();
+ boolean exclusive = amqd.isExclusive();
+ boolean autoDelete = amqd.isAutoDelete();
+ FieldTable arguments = null;
+ sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive);
+ }
+
+ private void sendQueueDeclare(final AMQShortString queueName,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete, final FieldTable arguments, final boolean passive)
+ throws AMQException, FailoverException
+ {
QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
+ queueName,
passive,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
+ durable,
+ exclusive,
+ autoDelete,
false,
- null);
+ arguments);
AMQFrame queueDeclare = body.generateFrame(getChannelId());
@@ -733,13 +855,207 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
+ @Override
+ public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal)
+ throws AMQException
+ {
+ if(!isAddrSyntaxSupported())
+ {
+ throw new UnsupportedAddressSyntaxException(dest);
+ }
+ super.resolveAddress(dest, isConsumer, noLocal);
+ }
+
+ private boolean isAddrSyntaxSupported()
+ {
+ return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported();
+ }
+
+ public int resolveAddressType(AMQDestination dest) throws AMQException
+ {
+ int type = dest.getAddressType();
+ String name = dest.getAddressName();
+ if (type != AMQDestination.UNKNOWN_TYPE)
+ {
+ return type;
+ }
+ else
+ {
+ boolean isExchange = exchangeExists(AMQShortString.valueOf(name));
+ boolean isQueue = isBound(null,AMQShortString.valueOf(name), null);
+
+ if (!isExchange && !isQueue)
+ {
+ type = dest instanceof AMQTopic ? AMQDestination.TOPIC_TYPE : AMQDestination.QUEUE_TYPE;
+ }
+ else if (!isExchange)
+ {
+ //name refers to a queue
+ type = AMQDestination.QUEUE_TYPE;
+ }
+ else if (!isQueue)
+ {
+ //name refers to an exchange
+ type = AMQDestination.TOPIC_TYPE;
+ }
+ else
+ {
+ //both a queue and exchange exist for that name
+ throw new AMQException("Ambiguous address, please specify queue or topic as node type");
+ }
+ dest.setAddressType(type);
+ return type;
+ }
+ }
+
+ protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException
+ {
+ final Node node = dest.getNode();
+ final Map<String,Object> arguments = node.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()),
+ node.isDurable(),
+ node.isExclusive(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments),
+ false);
+
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+ {
+ Node node = dest.getNode();
+ // can't set alt. exchange
+ declareExchange(AMQShortString.valueOf(dest.getAddressName()),
+ AMQShortString.valueOf(node.getExchangeType()),
+ false,
+ node.isDurable(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(node.getDeclareArgs()), false);
+
+ // If bindings are specified without a queue name and is called by the producer,
+ // the broker will send an exception as expected.
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+
+ protected void doBind(final AMQDestination dest,
+ final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
+ {
+ bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()),
+ FieldTable.convertToFieldTable(binding.getArgs()),
+ AMQShortString.valueOf(exchange),dest);
+ }
+
+ public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
+ {
+ Node node = dest.getNode();
+ return isQueueExist(dest.getAddressName(), assertNode,
+ node.isDurable(), node.isAutoDelete(),
+ node.isExclusive(), node.getDeclareArgs());
+ }
+
+ public boolean isQueueExist(final String queueName, boolean assertNode,
+ final boolean durable, final boolean autoDelete,
+ final boolean exclusive, final Map<String, Object> args) throws AMQException
{
- throw new UnsupportedAddressSyntaxException(dest);
+ boolean match = isBound(null,AMQShortString.valueOf(queueName), null);
+
+ if (assertNode)
+ {
+ if(!match)
+ {
+ throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." );
+
+ }
+ else
+ {
+
+ new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDeclare(AMQShortString.valueOf(queueName),
+ durable,
+ exclusive,
+ autoDelete,
+ FieldTable.convertToFieldTable(args),
+ true);
+
+ return null;
+ }
+ }, getAMQConnection());
+
+ }
+ }
+
+
+ return match;
}
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
+ {
+ boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName()));
+
+ Node node = dest.getNode();
+
+ if (match)
+ {
+ if (assertNode)
+ {
+
+ declareExchange(AMQShortString.valueOf(dest.getAddressName()),
+ AMQShortString.valueOf(node.getExchangeType()),
+ false,
+ node.isDurable(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(node.getDeclareArgs()), true);
+
+ }
+ else
+ {
+ // TODO - some way to determine the exchange type
+ /*
+ _logger.debug("Setting Exchange type " + result.getType());
+ node.setExchangeType(result.getType());
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ */
+
+ }
+ }
+
+ if (assertNode)
+ {
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +". Exchange not found.");
+ }
+ }
+
+ return match;
+ }
protected void flushAcknowledgments()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index f735895c81..23d65e15d8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();
+
+ // This is due to the Destination carrying the temporary subscription name which is incorrect.
+ if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
+ {
+ boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
+
+ if (!namedQueue)
+ {
+ setDestination(destination.copyDestination());
+ getDestination().setQueueName(null);
+ }
+ }
+
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 355c456249..89bf146398 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -57,30 +57,34 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory);
}
- void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination) throws AMQException
{
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- throw new UnsupportedAddressSyntaxException(destination);
+ getSession().resolveAddress(destination, false, false);
}
-
- if(getSession().isDeclareExchanges())
+ else
{
- final MethodRegistry methodRegistry = getSession().getMethodRegistry();
- ExchangeDeclareBody body =
- methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- destination.getExchangeName().toString().startsWith("amq."),
- destination.isExchangeDurable(),
- destination.isExchangeAutoDelete(),
- destination.isExchangeInternal(),
- true,
- null);
- AMQFrame declare = body.generateFrame(getChannelId());
-
- getConnection().getProtocolHandler().writeFrame(declare);
+ if (getSession().isDeclareExchanges())
+ {
+ final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+ ExchangeDeclareBody body =
+ methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ destination.getExchangeName()
+ .toString()
+ .startsWith("amq."),
+ destination.isExchangeDurable(),
+ destination.isExchangeAutoDelete(),
+ destination.isExchangeInternal(),
+ true,
+ null);
+ AMQFrame declare = body.generateFrame(getChannelId());
+
+ getConnection().getProtocolHandler().writeFrame(declare);
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index ad9a37479e..bd089eb6a8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -21,6 +21,23 @@
package org.apache.qpid.client.message;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +45,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
@@ -42,22 +58,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
import org.apache.qpid.transport.TransportException;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
/**
* This extends AbstractAMQMessageDelegate which contains common code between
* both the 0_8 and 0_10 Message types.
@@ -352,14 +352,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
try
{
- int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
+ int type = getAMQSession().resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd);
+ getAMQSession().setLegacyFieldsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
+ getAMQSession().setLegacyFieldsForTopicType(amqd);
}
}
catch(AMQException ex)