summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java77
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java122
4 files changed, 222 insertions, 47 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
index b6e433f532..a201f7d61e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
@@ -22,6 +22,10 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.url.BindingURL;
@@ -35,7 +39,7 @@ import org.apache.qpid.url.BindingURL;
* The abstract class AMQDestination has most of the functionality
* to support any destination defined in AMQP 0-10 spec.
*/
-public class AMQAnyDestination extends AMQDestination
+public class AMQAnyDestination extends AMQDestination implements Queue, Topic
{
public AMQAnyDestination(BindingURL binding)
{
@@ -66,4 +70,8 @@ public class AMQAnyDestination extends AMQDestination
return getAMQQueueName() == null;
}
+ public String getTopicName() throws JMSException
+ {
+ return super.getRoutingKey().toString();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 55812f8e01..4e8c5836e0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -151,6 +151,11 @@ public abstract class AMQDestination implements Destination, Referenceable
}
+ public static DestSyntax getDefaultDestSyntax()
+ {
+ return defaultDestSyntax;
+ }
+
protected AMQDestination(Address address) throws Exception
{
this._address = address;
@@ -159,21 +164,41 @@ public abstract class AMQDestination implements Destination, Referenceable
_logger.debug("Based on " + address + " the selected destination syntax is " + _destSyntax);
}
- protected AMQDestination(String str) throws URISyntaxException
+ public static DestSyntax getDestType(String str)
{
if (str.startsWith("BURL:") ||
- (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ {
+ return DestSyntax.BURL;
+ }
+ else
+ {
+ return DestSyntax.ADDR;
+ }
+ }
+
+ public static String stripSyntaxPrefix(String str)
+ {
+ if (str.startsWith("BURL:") || str.startsWith("ADDR:"))
+ {
+ return str.substring(5,str.length());
+ }
+ else
+ {
+ return str;
+ }
+ }
+
+ protected AMQDestination(String str) throws URISyntaxException
+ {
+ _destSyntax = getDestType(str);
+ str = stripSyntaxPrefix(str);
+ if (_destSyntax == DestSyntax.BURL)
{
- if (str.startsWith("BURL:"))
- {
- str = str.substring(5,str.length());
- }
- _destSyntax = DestSyntax.BURL;
getInfoFromBindingURL(new AMQBindingURL(str));
}
else
{
- _destSyntax = DestSyntax.ADDR;
this._address = createAddressFromString(str);
try
{
@@ -654,13 +679,10 @@ public abstract class AMQDestination implements Destination, Referenceable
public static Destination createDestination(String str) throws Exception
{
- if (str.startsWith("BURL:") ||
- (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL))
+ DestSyntax syntax = getDestType(str);
+ str = stripSyntaxPrefix(str);
+ if (syntax == DestSyntax.BURL)
{
- if (str.startsWith("BURL:"))
- {
- str = str.substring(5,str.length());
- }
return createDestination(new AMQBindingURL(str));
}
else
@@ -738,6 +760,10 @@ public abstract class AMQDestination implements Destination, Referenceable
public AddressOption getCreate() {
return _create;
}
+
+ public void setCreate(AddressOption option) {
+ _create = option;
+ }
public AddressOption getAssert() {
return _assert;
@@ -794,11 +820,7 @@ public abstract class AMQDestination implements Destination, Referenceable
private static Address createAddressFromString(String str)
{
- if (str.startsWith("ADDR:"))
- {
- str = str.substring(5,str.length());
- }
- return Address.parse(str);
+ return Address.parse(str);
}
private void getInfoFromAddress() throws Exception
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 10ee8a8a0d..55b56444de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1081,26 +1081,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public Queue createQueue(String queueName) throws JMSException
{
checkNotClosed();
-
- if (queueName.indexOf('/') == -1)
- {
- return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName));
- }
- else
+ try
{
- try
+ if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1)
{
- return new AMQQueue(queueName);
+ DestSyntax syntax = AMQDestination.getDestType(queueName);
+ if (syntax == AMQDestination.DestSyntax.BURL)
+ {
+ // For testing we may want to use the prefix
+ return new AMQQueue(getDefaultQueueExchangeName(),
+ new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName)));
+ }
+ else
+ {
+ AMQQueue queue = new AMQQueue(queueName);
+ queue.setCreate(AddressOption.ALWAYS);
+ return queue;
+
+ }
}
- catch (URISyntaxException urlse)
+ else
{
- _logger.error("", urlse);
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
- jmse.initCause(urlse);
- throw jmse;
+ return new AMQQueue(queueName);
}
}
+ catch (URISyntaxException urlse)
+ {
+ _logger.error("", urlse);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+ jmse.initCause(urlse);
+ throw jmse;
+ }
}
@@ -1352,24 +1364,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public Topic createTopic(String topicName) throws JMSException
{
checkNotClosed();
-
- if (topicName.indexOf('/') == -1)
- {
- return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
- }
- else
+ try
{
- try
+ if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1)
{
- return new AMQTopic(topicName);
+ DestSyntax syntax = AMQDestination.getDestType(topicName);
+ // for testing we may want to use the prefix to indicate our choice.
+ topicName = AMQDestination.stripSyntaxPrefix(topicName);
+ if (syntax == AMQDestination.DestSyntax.BURL)
+ {
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
+ }
+ else
+ {
+ return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName);
+ }
}
- catch (URISyntaxException urlse)
+ else
{
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
- jmse.initCause(urlse);
- throw jmse;
+ return new AMQTopic(topicName);
}
+
+ }
+ catch (URISyntaxException urlse)
+ {
+ _logger.error("", urlse);
+ JMSException jmse = new JMSException(urlse.getReason());
+ jmse.setLinkedException(urlse);
+ jmse.initCause(urlse);
+ throw jmse;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 2fc83fa026..aabeba3c63 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -33,6 +33,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import javax.naming.Context;
import org.apache.qpid.client.AMQAnyDestination;
@@ -456,6 +457,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
+ /**
+ * Test goal: Verifies that and address based destination can be used successfully
+ * as a reply to.
+ */
public void testAddressBasedReplyTo() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -484,4 +489,121 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
Message replyToMsg = replyToCons.receive(1000);
assertNotNull("The reply to consumer should have got the message",replyToMsg);
}
+
+ /**
+ * Test goal: Verifies that session.createQueue method
+ * works as expected both with the new and old addressing scheme.
+ */
+ public void testSessionCreateQueue() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ // Using the BURL method
+ Destination queue = ssn.createQueue("my-queue");
+ MessageProducer prod = ssn.createProducer(queue);
+ MessageConsumer cons = ssn.createConsumer(queue);
+ assertTrue("my-queue was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ "my-queue","my-queue", null));
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method
+ queue = ssn.createQueue("ADDR:my-queue2");
+ prod = ssn.createProducer(queue);
+ cons = ssn.createConsumer(queue);
+ assertTrue("my-queue2 was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("",
+ "my-queue2","my-queue2", null));
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method to create a more complicated queue
+ String addr = "ADDR:amq.direct/x512; {create: receiver, " +
+ "link : {name : 'MY.RESP.QUEUE', " +
+ "x-declare : { auto-delete: true, exclusive: true, " +
+ "'qpid.max_size': 1000, 'qpid.policy_type': ring } } }";
+ queue = ssn.createQueue(addr);
+
+ prod = ssn.createProducer(queue);
+ cons = ssn.createConsumer(queue);
+ assertTrue("MY.RESP.QUEUE was not created as expected",(
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ "MY.RESP.QUEUE","x512", null));
+ cons.close();
+ }
+
+ /**
+ * Test goal: Verifies that session.creatTopic method
+ * works as expected both with the new and old addressing scheme.
+ */
+ public void testSessionCreateTopic() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ // Using the BURL method
+ Topic topic = ssn.createTopic("ACME");
+ MessageProducer prod = ssn.createProducer(topic);
+ MessageConsumer cons = ssn.createConsumer(topic);
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ // Using the ADDR method
+ topic = ssn.createTopic("ADDR:ACME");
+ prod = ssn.createProducer(topic);
+ cons = ssn.createConsumer(topic);
+
+ prod.send(ssn.createTextMessage("test"));
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+
+ String addr = "ADDR:vehicles/bus; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
+ "{" +
+ "type: topic, " +
+ "x-declare: " +
+ "{ " +
+ "type:direct, " +
+ "auto-delete: true, " +
+ "'qpid.msg_sequence': 1, " +
+ "'qpid.ive': 1" +
+ "}" +
+ "}, " +
+ "link: {name : my-topic, " +
+ "x-bindings: [{exchange : 'vehicles', key : car}, " +
+ "{exchange : 'vehicles', key : van}]" +
+ "}" +
+ "}";
+
+ // Using the ADDR method to create a more complicated topic
+ topic = ssn.createTopic(addr);
+ prod = ssn.createProducer(topic);
+ cons = ssn.createConsumer(topic);
+
+ assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","bus", null));
+
+ assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","car", null));
+
+ assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ "my-topic","van", null));
+
+ Message msg = ssn.createTextMessage("test");
+ msg.setStringProperty("qpid.subject", "van");
+ prod.send(msg);
+ assertNotNull("consumer should receive a message",cons.receive(1000));
+ cons.close();
+ }
}