diff options
Diffstat (limited to 'java')
4 files changed, 222 insertions, 47 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java index b6e433f532..a201f7d61e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java @@ -22,6 +22,10 @@ package org.apache.qpid.client; import java.net.URISyntaxException; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Topic; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.messaging.Address; import org.apache.qpid.url.BindingURL; @@ -35,7 +39,7 @@ import org.apache.qpid.url.BindingURL; * The abstract class AMQDestination has most of the functionality * to support any destination defined in AMQP 0-10 spec. */ -public class AMQAnyDestination extends AMQDestination +public class AMQAnyDestination extends AMQDestination implements Queue, Topic { public AMQAnyDestination(BindingURL binding) { @@ -66,4 +70,8 @@ public class AMQAnyDestination extends AMQDestination return getAMQQueueName() == null; } + public String getTopicName() throws JMSException + { + return super.getRoutingKey().toString(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 55812f8e01..4e8c5836e0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -151,6 +151,11 @@ public abstract class AMQDestination implements Destination, Referenceable } + public static DestSyntax getDefaultDestSyntax() + { + return defaultDestSyntax; + } + protected AMQDestination(Address address) throws Exception { this._address = address; @@ -159,21 +164,41 @@ public abstract class AMQDestination implements Destination, Referenceable _logger.debug("Based on " + address + " the selected destination syntax is " + _destSyntax); } - protected AMQDestination(String str) throws URISyntaxException + public static DestSyntax getDestType(String str) { if (str.startsWith("BURL:") || - (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL)) + (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL)) + { + return DestSyntax.BURL; + } + else + { + return DestSyntax.ADDR; + } + } + + public static String stripSyntaxPrefix(String str) + { + if (str.startsWith("BURL:") || str.startsWith("ADDR:")) + { + return str.substring(5,str.length()); + } + else + { + return str; + } + } + + protected AMQDestination(String str) throws URISyntaxException + { + _destSyntax = getDestType(str); + str = stripSyntaxPrefix(str); + if (_destSyntax == DestSyntax.BURL) { - if (str.startsWith("BURL:")) - { - str = str.substring(5,str.length()); - } - _destSyntax = DestSyntax.BURL; getInfoFromBindingURL(new AMQBindingURL(str)); } else { - _destSyntax = DestSyntax.ADDR; this._address = createAddressFromString(str); try { @@ -654,13 +679,10 @@ public abstract class AMQDestination implements Destination, Referenceable public static Destination createDestination(String str) throws Exception { - if (str.startsWith("BURL:") || - (!str.startsWith("ADDR:") && defaultDestSyntax == DestSyntax.BURL)) + DestSyntax syntax = getDestType(str); + str = stripSyntaxPrefix(str); + if (syntax == DestSyntax.BURL) { - if (str.startsWith("BURL:")) - { - str = str.substring(5,str.length()); - } return createDestination(new AMQBindingURL(str)); } else @@ -738,6 +760,10 @@ public abstract class AMQDestination implements Destination, Referenceable public AddressOption getCreate() { return _create; } + + public void setCreate(AddressOption option) { + _create = option; + } public AddressOption getAssert() { return _assert; @@ -794,11 +820,7 @@ public abstract class AMQDestination implements Destination, Referenceable private static Address createAddressFromString(String str) { - if (str.startsWith("ADDR:")) - { - str = str.substring(5,str.length()); - } - return Address.parse(str); + return Address.parse(str); } private void getInfoFromAddress() throws Exception diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 10ee8a8a0d..55b56444de 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1081,26 +1081,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - - if (queueName.indexOf('/') == -1) - { - return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(queueName)); - } - else + try { - try + if (queueName.indexOf('/') == -1 && queueName.indexOf(';') == -1) { - return new AMQQueue(queueName); + DestSyntax syntax = AMQDestination.getDestType(queueName); + if (syntax == AMQDestination.DestSyntax.BURL) + { + // For testing we may want to use the prefix + return new AMQQueue(getDefaultQueueExchangeName(), + new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName))); + } + else + { + AMQQueue queue = new AMQQueue(queueName); + queue.setCreate(AddressOption.ALWAYS); + return queue; + + } } - catch (URISyntaxException urlse) + else { - _logger.error("", urlse); - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - jmse.initCause(urlse); - throw jmse; + return new AMQQueue(queueName); } } + catch (URISyntaxException urlse) + { + _logger.error("", urlse); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + jmse.initCause(urlse); + throw jmse; + } } @@ -1352,24 +1364,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public Topic createTopic(String topicName) throws JMSException { checkNotClosed(); - - if (topicName.indexOf('/') == -1) - { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); - } - else + try { - try + if (topicName.indexOf('/') == -1 && topicName.indexOf(';') == -1) { - return new AMQTopic(topicName); + DestSyntax syntax = AMQDestination.getDestType(topicName); + // for testing we may want to use the prefix to indicate our choice. + topicName = AMQDestination.stripSyntaxPrefix(topicName); + if (syntax == AMQDestination.DestSyntax.BURL) + { + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + } + else + { + return new AMQTopic("ADDR:" + getDefaultTopicExchangeName() + "/" + topicName); + } } - catch (URISyntaxException urlse) + else { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - jmse.initCause(urlse); - throw jmse; + return new AMQTopic(topicName); } + + } + catch (URISyntaxException urlse) + { + _logger.error("", urlse); + JMSException jmse = new JMSException(urlse.getReason()); + jmse.setLinkedException(urlse); + jmse.initCause(urlse); + throw jmse; } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 2fc83fa026..aabeba3c63 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -33,6 +33,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.naming.Context; import org.apache.qpid.client.AMQAnyDestination; @@ -456,6 +457,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } + /** + * Test goal: Verifies that and address based destination can be used successfully + * as a reply to. + */ public void testAddressBasedReplyTo() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -484,4 +489,121 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase Message replyToMsg = replyToCons.receive(1000); assertNotNull("The reply to consumer should have got the message",replyToMsg); } + + /** + * Test goal: Verifies that session.createQueue method + * works as expected both with the new and old addressing scheme. + */ + public void testSessionCreateQueue() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + + // Using the BURL method + Destination queue = ssn.createQueue("my-queue"); + MessageProducer prod = ssn.createProducer(queue); + MessageConsumer cons = ssn.createConsumer(queue); + assertTrue("my-queue was not created as expected",( + (AMQSession_0_10)ssn).isQueueBound("amq.direct", + "my-queue","my-queue", null)); + + prod.send(ssn.createTextMessage("test")); + assertNotNull("consumer should receive a message",cons.receive(1000)); + cons.close(); + + // Using the ADDR method + queue = ssn.createQueue("ADDR:my-queue2"); + prod = ssn.createProducer(queue); + cons = ssn.createConsumer(queue); + assertTrue("my-queue2 was not created as expected",( + (AMQSession_0_10)ssn).isQueueBound("", + "my-queue2","my-queue2", null)); + + prod.send(ssn.createTextMessage("test")); + assertNotNull("consumer should receive a message",cons.receive(1000)); + cons.close(); + + // Using the ADDR method to create a more complicated queue + String addr = "ADDR:amq.direct/x512; {create: receiver, " + + "link : {name : 'MY.RESP.QUEUE', " + + "x-declare : { auto-delete: true, exclusive: true, " + + "'qpid.max_size': 1000, 'qpid.policy_type': ring } } }"; + queue = ssn.createQueue(addr); + + prod = ssn.createProducer(queue); + cons = ssn.createConsumer(queue); + assertTrue("MY.RESP.QUEUE was not created as expected",( + (AMQSession_0_10)ssn).isQueueBound("amq.direct", + "MY.RESP.QUEUE","x512", null)); + cons.close(); + } + + /** + * Test goal: Verifies that session.creatTopic method + * works as expected both with the new and old addressing scheme. + */ + public void testSessionCreateTopic() throws Exception + { + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + + // Using the BURL method + Topic topic = ssn.createTopic("ACME"); + MessageProducer prod = ssn.createProducer(topic); + MessageConsumer cons = ssn.createConsumer(topic); + + prod.send(ssn.createTextMessage("test")); + assertNotNull("consumer should receive a message",cons.receive(1000)); + cons.close(); + + // Using the ADDR method + topic = ssn.createTopic("ADDR:ACME"); + prod = ssn.createProducer(topic); + cons = ssn.createConsumer(topic); + + prod.send(ssn.createTextMessage("test")); + assertNotNull("consumer should receive a message",cons.receive(1000)); + cons.close(); + + String addr = "ADDR:vehicles/bus; " + + "{ " + + "create: always, " + + "node: " + + "{" + + "type: topic, " + + "x-declare: " + + "{ " + + "type:direct, " + + "auto-delete: true, " + + "'qpid.msg_sequence': 1, " + + "'qpid.ive': 1" + + "}" + + "}, " + + "link: {name : my-topic, " + + "x-bindings: [{exchange : 'vehicles', key : car}, " + + "{exchange : 'vehicles', key : van}]" + + "}" + + "}"; + + // Using the ADDR method to create a more complicated topic + topic = ssn.createTopic(addr); + prod = ssn.createProducer(topic); + cons = ssn.createConsumer(topic); + + assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( + (AMQSession_0_10)ssn).isQueueBound("vehicles", + "my-topic","bus", null)); + + assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( + (AMQSession_0_10)ssn).isQueueBound("vehicles", + "my-topic","car", null)); + + assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( + (AMQSession_0_10)ssn).isQueueBound("vehicles", + "my-topic","van", null)); + + Message msg = ssn.createTextMessage("test"); + msg.setStringProperty("qpid.subject", "van"); + prod.send(msg); + assertNotNull("consumer should receive a message",cons.receive(1000)); + cons.close(); + } } |
