summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-07-16 23:50:13 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-07-16 23:50:13 +0000
commitdcfa11df9462a49d8baf9500a8b3d8a3ce5d979f (patch)
tree441643f3a5d17107deb0f5a5241c71a640942f93
parent15504d31d088348681be4d30ef3302982267ecb1 (diff)
downloadqpid-python-dcfa11df9462a49d8baf9500a8b3d8a3ce5d979f.tar.gz
QPID-2736
The session.createQueue and session.createTopic methods will behave as follows. session.createQueue =================== 1. If just a queue name is passed, 1.1 If the destination syntax is BURL, a queue by that name will be created and bound to the amq.direct exchange using the queue name as the binding key. If published using this destination, then the message will be sent to amq.direct with routing key set to the queue name. 1.2 If destination syntax is ADDR, a queue is created by that name. If published using this exchange then the message will be sent to the defualt exchange (nameless exchange) with routing key set to the queue name. 2. If an address string or binding url is passed, it will be passed accordingly. session.createTopic =================== 1. If just a topic name is passed, 1.1 If destination syntax is ADDR, a temp queue is created and bound to the amq.topic with the topic name as binding key. If published using this destination, then the message will be sent to amq.topic with the topic name set to routing key. The topic name will also be set as "qpid.subject" in the outgoing message. 1.2 Same as above except there will be no subject set. 2. If an address string or binding url is passed, it will be passed accordingly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@964984 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java77
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java122
4 files changed, 222 insertions, 47 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index b6e433f532..a201f7d61e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -22,6 +22,10 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
@@ -35,7 +39,7 @@ import org.apache.qpid.url.BindingURL;
* The abstract class AMQDestination has most of the functionality
* to support any destination defined in AMQP 0-10 spec.
*/
-public class AMQAnyDestination extends AMQDestination
+public class AMQAnyDestination extends AMQDestination implements Queue, Topic
{
public AMQAnyDestination(BindingURL binding)
{
@@ -66,4 +70,8 @@ public class AMQAnyDestination extends AMQDestination
return getAMQQueueName() == null;
}
+ public String getTopicName() throws JMSException
+ {
+ return super.getRoutingKey().toString();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 55812f8e01..4e8c5836e0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -151,6 +151,11 @@ public abstract class AMQDestination implements Destination, Referenceable
}
+ public static DestSyntax getDefaultDestSyntax()
+ {
+ return defaultDestSyntax;
+ }
+
protected AMQDestination(Address address) throws Exception
{
this._address = address;
@@ -159,21 +164,41 @@ public abstract class AMQDestination implements Destination, Referenceable
_logger.debug("Based on " + address + " the selected destination syntax is " + _destSyntax);
}
- protected AMQDestination(String str) throws URISyntaxException
+ public static DestSyntax getDestType(String str)
{
if (str.startsWith("BURL:") ||
- (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ {
+ return DestSyntax.BURL;
+ }
+ else
+ {
+ return DestSyntax.ADDR;
+ }
+ }
+
+ public static String stripSyntaxPrefix(String str)
+ {
+ if (str.startsWith("BURL:") || str.startsWith("ADDR:"))
+ {
+ return str.substring(5,str.length());
+ }
+ else
+ {
+ return str;
+ }
+ }
+
+ protected AMQDestination(String str) throws URISyntaxException
+ {
+ _destSyntax = getDestType(str);
+ str = stripSyntaxPrefix(str);
+ if (_destSyntax == DestSyntax.BURL)
{
- if (str.startsWith("BURL:"))
- {
- str = str.substring(5,str.length());
- }
- _destSyntax = DestSyntax.BURL;
getInfoFromBindingURL(new AMQBindingURL(str));
}
else
{
- _destSyntax = DestSyntax.ADDR;
this._address = createAddressFromString(str);
try
{
@@ -654,13 +679,10 @@ public abstract class AMQDestination implements Destination, Referenceable
public static Destination createDestination(String str) throws Exception
{
- if (str.startsWith("BURL:") ||
- (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ DestSyntax syntax = getDestType(str);
+ str = stripSyntaxPrefix(str);
+ if (syntax == DestSyntax.BURL)
{
- if (str.startsWith("BURL:"))
- {
- str = str.substring(5,str.length());
- }
return createDestination(new AMQBindingURL(str));
}
else
@@ -738,6 +760,10 @@ public abstract class AMQDestination implements Destination, Referenceable
public AddressOption getCreate() {
return _create;
}
+
+ public void setCreate(AddressOption option) {
+ _create = option;
+ }
public AddressOption getAssert() {
return _assert;
@@ -794,11 +820,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private static Address createAddressFromString(String str)
{
- if (str.startsWith("ADDR:"))
- {
- str = str.substring(5,str.length());
- }
- return Address.parse(str);
+ return Address.parse(str);
}
private void getInfoFromAddress() throws Exception
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 10ee8a8a0d..55b56444de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1081,26 +1081,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
-
- if (queueName.indexOf('/') == -1)
- {
- return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
- }
- else
+ try
{
- try
+ if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1)
{
- return new AMQQueue(queueName);
+ DestSyntax syntax = AMQDestination.getDestType(queueName);
+ if (syntax == AMQDestination.DestSyntax.BURL)
+ {
+ // For testing we may want to use the prefix
+ return new AMQQueue(getDefaultQueueExchangeName(),
+ new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
+ }
+ else
+ {
+ AMQQueue queue = new AMQQueue(queueName);
+ queue.setCreate(AddressOption.ALWAYS);
+ return queue;
+
+ }
}
- catch (URISyntaxException urlse)
+ else
{
- _logger.error("", urlse);
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
- jmse.initCause(urlse);
- throw jmse;
+ return new AMQQueue(queueName);
}
}
+ catch (URISyntaxException urlse)
+ {
+ _logger.error("", urlse);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+ jmse.initCause(urlse);
+ throw jmse;
+ }
}
@@ -1352,24 +1364,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public Topic createTopic(String topicName) throws JMSException
{
checkNotClosed();
-
- if (topicName.indexOf('/') == -1)
- {
- return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
- }
- else
+ try
{
- try
+ if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
{
- return new AMQTopic(topicName);
+ DestSyntax syntax = AMQDestination.getDestType(topicName);
+ // for testing we may want to use the prefix to indicate our choice.
+ topicName = AMQDestination.stripSyntaxPrefix(topicName);
+ if (syntax == AMQDestination.DestSyntax.BURL)
+ {
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+ }
+ else
+ {
+ return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName);
+ }
}
- catch (URISyntaxException urlse)
+ else
{
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
- jmse.initCause(urlse);
- throw jmse;
+ return new AMQTopic(topicName);
}
+
+ }
+ catch (URISyntaxException urlse)
+ {
+ _logger.error("", urlse);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+ jmse.initCause(urlse);
+ throw jmse;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 2fc83fa026..aabeba3c63 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -33,6 +33,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.naming.Context;
import org.apache.qpid.client.AMQAnyDestination;
@@ -456,6 +457,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
+ /**
+ * Test goal: Verifies that and address based destination can be used successfully
+ * as a reply to.
+ */
public void testAddressBasedReplyTo() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -484,4 +489,121 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
Message replyToMsg = replyToCons.receive(1000);
assertNotNull("The reply to consumer should have got the message",replyToMsg);
}
+
+ /**
+ * Test goal: Verifies that session.createQueue method
+ * works as expected both with the new and old addressing scheme.
+ */
+ public void testSessionCreateQueue() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ // Using the BURL method
+ Destination queue = ssn.createQueue("my-queue");
+ MessageProducer prod = ssn.createProducer(queue);
+ MessageConsumer cons = ssn.createConsumer(queue);
+ assertTrue("my-queue was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ "my-queue","my-queue", null));
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method
+ queue = ssn.createQueue("ADDR:my-queue2");
+ prod = ssn.createProducer(queue);
+ cons = ssn.createConsumer(queue);
+ assertTrue("my-queue2 was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("",
+ "my-queue2","my-queue2", null));
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method to create a more complicated queue
+ String addr = "ADDR:amq.direct/x512; {create: receiver, " +
+ "link : {name : 'MY.RESP.QUEUE', " +
+ "x-declare : { auto-delete: true, exclusive: true, " +
+ "'qpid.max_size': 1000, 'qpid.policy_type': ring } } }";
+ queue = ssn.createQueue(addr);
+
+ prod = ssn.createProducer(queue);
+ cons = ssn.createConsumer(queue);
+ assertTrue("MY.RESP.QUEUE was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ "MY.RESP.QUEUE","x512", null));
+ cons.close();
+ }
+
+ /**
+ * Test goal: Verifies that session.creatTopic method
+ * works as expected both with the new and old addressing scheme.
+ */
+ public void testSessionCreateTopic() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ // Using the BURL method
+ Topic topic = ssn.createTopic("ACME");
+ MessageProducer prod = ssn.createProducer(topic);
+ MessageConsumer cons = ssn.createConsumer(topic);
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method
+ topic = ssn.createTopic("ADDR:ACME");
+ prod = ssn.createProducer(topic);
+ cons = ssn.createConsumer(topic);
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ String addr = "ADDR:vehicles/bus; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "type: topic, " +
+ "x-declare: " +
+ "{ " +
+ "type:direct, " +
+ "auto-delete: true, " +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}" +
+ "}, " +
+ "link: {name : my-topic, " +
+ "x-bindings: [{exchange : 'vehicles', key : car}, " +
+ "{exchange : 'vehicles', key : van}]" +
+ "}" +
+ "}";
+
+ // Using the ADDR method to create a more complicated topic
+ topic = ssn.createTopic(addr);
+ prod = ssn.createProducer(topic);
+ cons = ssn.createConsumer(topic);
+
+ assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","bus", null));
+
+ assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","car", null));
+
+ assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","van", null));
+
+ Message msg = ssn.createTextMessage("test");
+ msg.setStringProperty("qpid.subject", "van");
+ prod.send(msg);
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+ }
}