diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
| commit | 1f92ceb67b4c717d425cad75c5ecde8e08f7874e (patch) | |
| tree | e0b5a10c8c765bc52d5abc4c1674dc9da61cdffe /java/systests | |
| parent | 690b61e5fe6cb9ee60406eacffb7584c1f3a1a83 (diff) | |
| download | qpid-python-1f92ceb67b4c717d425cad75c5ecde8e08f7874e.tar.gz | |
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
6 files changed, 345 insertions, 303 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 3d116f1b1b..91f56f369b 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -83,9 +83,12 @@ public class ReturnUnroutableMandatoryMessageTest extends QpidBrokerTestCase imp AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); + FieldTable ft = new FieldTable(); ft.setString("F1000", "1"); - consumer = consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT) /2 , false, false, (String) null, ft); + consumerSession.declareAndBind(queue, ft); + + consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java index dfd26b474a..646c17d1f2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java @@ -111,7 +111,7 @@ public class BindingLoggingTest extends AbstractTestLogging String messageID = "BND-1001"; String queueName = _queue.getQueueName(); String exchange = "direct/amq.direct"; - String message = "Create : Arguments : {x-filter-jms-selector=}"; + String message = "Create"; validateLogMessage(getLogMessage(results, 0), messageID, message, exchange, queueName, queueName); } @@ -145,7 +145,7 @@ public class BindingLoggingTest extends AbstractTestLogging // Perform full testing on the binding String message = getMessageString(fromMessage(getLogMessage(results, 0))); - + validateLogMessage(getLogMessage(results, 0), messageID, message, "topic/amq.topic", "topic", "clientid:" + getName()); @@ -208,17 +208,17 @@ public class BindingLoggingTest extends AbstractTestLogging validateMessageID(messageID, log); String subject = fromSubject(log); - + validateBindingDeleteArguments(subject, "/test"); assertEquals("Log Message not as expected", message, getMessageString(fromMessage(log))); } - + private void validateBindingDeleteArguments(String subject, String vhostName) { String routingKey = AbstractTestLogSubject.getSlice("rk", subject); - + assertTrue("Routing Key does not start with TempQueue:"+routingKey, routingKey.startsWith("TempQueue")); assertEquals("Virtualhost not correct.", vhostName, diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 3783b0bd02..67a2988ad1 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,16 +7,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ package org.apache.qpid.test.client.destination; @@ -49,7 +49,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class); private Connection _connection; - + @Override public void setUp() throws Exception { @@ -57,20 +57,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _connection = getConnection() ; _connection.start(); } - + @Override public void tearDown() throws Exception { _connection.close(); super.tearDown(); } - + public void testCreateOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducer prod; MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; @@ -84,7 +84,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + try { prod = jmsSession.createProducer(dest); @@ -94,22 +94,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); - - + + // create always ------------------------------------------- addr1 = "ADDR:testQueue1; { create: always }"; dest = new AMQAnyDestination(addr1); - cons = jmsSession.createConsumer(dest); - + cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; dest = new AMQAnyDestination(addr1); @@ -122,32 +122,32 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - - - cons = jmsSession.createConsumer(dest); - + + + cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; dest = new AMQAnyDestination(addr1); try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } catch(JMSException e) { assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + try { prod = jmsSession.createProducer(dest); @@ -157,17 +157,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; dest = new AMQAnyDestination(addr1); - + try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } catch(JMSException e) { @@ -176,84 +176,84 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + } - + public void testCreateQueue() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-queue/hello; " + - "{" + + "{" + "create: always, " + - "node: " + - "{" + + "node: " + + "{" + "durable: true ," + "x-declare: " + - "{" + + "{" + "exclusive: true," + - "arguments: {" + + "arguments: {" + "'qpid.max_size': 1000," + "'qpid.max_count': 100" + - "}" + - "}, " + - "x-bindings: [{exchange : 'amq.direct', key : test}, " + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', key : test}, " + "{exchange : 'amq.fanout'}," + "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," + "{exchange : 'amq.topic', key : 'a.#'}" + - "]," + - + "]," + + "}" + "}"; AMQDestination dest = new AMQAnyDestination(addr); - MessageConsumer cons = jmsSession.createConsumer(dest); + MessageConsumer cons = jmsSession.createConsumer(dest); cons.close(); - + // Even if the consumer is closed the queue and the bindings should be intact. - + assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", + (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", dest.getAddressName(),null, null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + dest.getAddressName(),"a.#", null)); + Map<String,Object> args = new HashMap<String,Object>(); args.put("x-match","any"); args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); - + MessageProducer prod = jmsSession.createProducer(dest); prod.send(jmsSession.createTextMessage("test")); - + MessageConsumer cons2 = jmsSession.createConsumer(jmsSession.createQueue("ADDR:my-queue")); Message m = cons2.receive(1000); assertNotNull("Should receive message sent to my-queue",m); assertEquals("The subject set in the message is incorrect","hello",m.getStringProperty(QpidMessageProperties.QPID_SUBJECT)); } - + public void testCreateExchange() throws Exception { createExchangeImpl(false, false); @@ -283,21 +283,21 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - String addr = "ADDR:my-exchange/hello; " + - "{ " + - "create: always, " + - "node: " + + String addr = "ADDR:my-exchange/hello; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true" + createExchangeArgsString(withExchangeArgs, useNonsenseArguments) + "}" + "}" + "}"; - + AMQDestination dest = new AMQAnyDestination(addr); MessageConsumer cons; @@ -322,20 +322,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("Unexpected exception whilst creating consumer: " + e); } } - + assertTrue("Exchange not created as expected",( (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); - + // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", - dest.getQueueName(),"hello", Collections.<String, Object>emptyMap())); - + (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + dest.getQueueName(),"hello", null)); + // The client should be able to query and verify the existence of my-exchange (QPID-2774) dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } - + private String createExchangeArgsString(final boolean withExchangeArgs, final boolean useNonsenseArguments) { @@ -366,60 +366,60 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { assertTrue("Queue not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + dest.getAddressName(),"test", null)); + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); - + Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } - + /** * Test goal: Verifies that a producer and consumer creation triggers the correct * behavior for x-bindings specified in node props. */ public void testBindQueueWithArgs() throws Exception { - + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; - - String addr = "node: " + - "{" + + + String addr = "node: " + + "{" + "durable: true ," + - "x-declare: " + - "{ " + + "x-declare: " + + "{ " + "auto-delete: true," + "arguments: {'qpid.max_count': 100}" + "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + - "{exchange : 'amq.topic', key : 'a.#'}," + - headersBinding + + "{exchange : 'amq.topic', key : 'a.#'}," + + headersBinding + "]" + "}" + "}"; - + AMQDestination dest1 = new AMQAnyDestination("ADDR:my-queue/hello; {create: receiver, " +addr); - MessageConsumer cons = jmsSession.createConsumer(dest1); - checkQueueForBindings(jmsSession,dest1,headersBinding); - + MessageConsumer cons = jmsSession.createConsumer(dest1); + checkQueueForBindings(jmsSession,dest1,headersBinding); + AMQDestination dest2 = new AMQAnyDestination("ADDR:my-queue2/hello; {create: sender, " +addr); - MessageProducer prod = jmsSession.createProducer(dest2); - checkQueueForBindings(jmsSession,dest2,headersBinding); + MessageProducer prod = jmsSession.createProducer(dest2); + checkQueueForBindings(jmsSession,dest2,headersBinding); } - + /** * Test goal: Verifies the capacity property in address string is handled properly. * Test strategy: @@ -427,22 +427,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Creates consumer with client ack. * Sends 15 messages to the queue, tries to receive 10. * Tries to receive the 11th message and checks if its null. - * - * Since capacity is 10 and we haven't acked any messages, + * + * Since capacity is 10 and we haven't acked any messages, * we should not have received the 11th. - * + * * Acks the 10th message and verifies we receive the rest of the msgs. */ public void testCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}"); } - + public void testSourceAndTargetCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}"); } - + private void verifyCapacity(String address) throws Exception { if (!isCppBroker()) @@ -450,13 +450,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _logger.info("Not C++ broker, exiting test"); return; } - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination dest = new AMQAnyDestination(address); - MessageConsumer cons = jmsSession.createConsumer(dest); + MessageConsumer cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); - + for (int i=0; i< 15; i++) { prod.send(jmsSession.createTextMessage("msg" + i) ); @@ -475,48 +475,48 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT)); } } - + /** * Test goal: Verifies if the new address format based destinations * can be specified and loaded correctly from the properties file. - * + * */ public void testLoadingFromPropertiesFile() throws Exception { - Hashtable<String,String> map = new Hashtable<String,String>(); - map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + + Hashtable<String,String> map = new Hashtable<String,String>(); + map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}"); - + map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }"); map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'"); - + PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory(); Context ctx = props.getInitialContext(map); - - AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); + + AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2"); AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3"); - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons1 = jmsSession.createConsumer(dest1); + MessageConsumer cons1 = jmsSession.createConsumer(dest1); MessageConsumer cons2 = jmsSession.createConsumer(dest2); MessageConsumer cons3 = jmsSession.createConsumer(dest3); - + assertTrue("Destination1 was not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); - + assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); - + assertTrue("Destination2 was not created as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); - + assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); - + MessageProducer producer = jmsSession.createProducer(dest3); producer.send(jmsSession.createTextMessage("Hello")); TextMessage msg = (TextMessage)cons3.receive(1000); @@ -527,31 +527,31 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Test goal: Verifies the subject can be overridden using "qpid.subject" message property. * Test strategy: Creates and address with a default subject "topic1" * Creates a message with "qpid.subject"="topic2" and sends it. - * Verifies that the message goes to "topic2" instead of "topic1". + * Verifies that the message goes to "topic2" instead of "topic1". */ public void testOverridingSubject() throws Exception { Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); - + MessageProducer prod = jmsSession.createProducer(topic1); - + Message m = jmsSession.createTextMessage("Hello"); m.setStringProperty("qpid.subject", "topic2"); - + MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1); MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); - + prod.send(m); Message msg = consForTopic1.receive(1000); assertNull("message shouldn't have been sent to topic1",msg); - + msg = consForTopic2.receive(1000); - assertNotNull("message should have been sent to topic2",msg); - + assertNotNull("message should have been sent to topic2",msg); + } - + /** * Test goal: Verifies that session.createQueue method * works as expected both with the new and old addressing scheme. @@ -559,19 +559,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSessionCreateQueue() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Destination queue = ssn.createQueue("my-queue"); - MessageProducer prod = ssn.createProducer(queue); + MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method // default case queue = ssn.createQueue("ADDR:my-queue2"); @@ -586,34 +586,34 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "doesn't resolve to an exchange or a queue"; assertEquals(s,e.getCause().getCause().getMessage()); } - + // explicit create case queue = ssn.createQueue("ADDR:my-queue2; {create: sender}"); - prod = ssn.createProducer(queue); + prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession_0_10)ssn).isQueueBound("", "my-queue2","my-queue2", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method to create a more complicated queue String addr = "ADDR:amq.direct/x512; {" + - "link : {name : 'MY.RESP.QUEUE', " + + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); - + cons = ssn.createConsumer(queue); prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } - + /** * Test goal: Verifies that session.creatTopic method works as expected * both with the new and old addressing scheme. @@ -635,68 +635,68 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase private void sessionCreateTopicImpl(boolean withExchangeArgs) throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Topic topic = ssn.createTopic("ACME"); - MessageProducer prod = ssn.createProducer(topic); + MessageProducer prod = ssn.createProducer(topic); MessageConsumer cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method topic = ssn.createTopic("ADDR:ACME"); - prod = ssn.createProducer(topic); + prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - String addr = "ADDR:vehicles/bus; " + - "{ " + - "create: always, " + - "node: " + + String addr = "ADDR:vehicles/bus; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true" + createExchangeArgsString(withExchangeArgs, false) + "}" + "}, " + "link: {name : my-topic, " + "x-bindings: [{exchange : 'vehicles', key : car}, " + - "{exchange : 'vehicles', key : van}]" + - "}" + + "{exchange : 'vehicles', key : van}]" + + "}" + "}"; - + // Using the ADDR method to create a more complicated topic topic = ssn.createTopic(addr); cons = ssn.createConsumer(topic); prod = ssn.createProducer(topic); - + assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","bus", null)); - + assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","car", null)); - + assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","van", null)); - + Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "van"); prod.send(msg); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + /** * Test Goal : Verify the default subjects used for each exchange type. * The default for amq.topic is "#" and for the rest it's "" @@ -704,92 +704,92 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testDefaultSubjects() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct")); MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic")); - + MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct")); MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather")); MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales")); - + queueProducer.send(ssn.createBytesMessage()); assertNotNull("The consumer subscribed to amq.direct " + "with empty binding key should have received the message ",queueCons.receive(1000)); - + topicProducer1.send(ssn.createTextMessage("25c")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"25c"); - + topicProducer2.send(ssn.createTextMessage("1000")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"1000"); } - + /** * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer. * This indirectly tests ring queues as well. */ public void testBrowseMode() throws Exception { - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " + "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " + "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}"; - + Destination dest = ssn.createQueue(addr); MessageConsumer browseCons = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); - + prod.send(ssn.createTextMessage("Test1")); prod.send(ssn.createTextMessage("Test2")); - + TextMessage msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test1"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test2"); - - browseCons.close(); + + browseCons.close(); prod.send(ssn.createTextMessage("Test3")); browseCons = ssn.createConsumer(dest); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the second message again",msg.getText(),"Test2"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3"); - + assertNull("Should not receive anymore messages",browseCons.receive(500)); } - + /** * Test Goal : When the same destination is used when creating two consumers, - * If the type == topic, verify that unique subscription queues are created, + * If the type == topic, verify that unique subscription queues are created, * unless subscription queue has a name. - * + * * If the type == queue, same queue should be shared. */ public void testSubscriptionForSameDestination() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); TextMessage m = (TextMessage)consumer1.receive(1000); assertEquals("Consumer1 should recieve message A",m.getText(),"A"); m = (TextMessage)consumer2.receive(1000); assertEquals("Consumer2 should recieve message A",m.getText(),"A"); - + consumer1.close(); consumer2.close(); - + dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}"); consumer1 = ssn.createConsumer(dest); try @@ -798,61 +798,61 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } _connection.close(); - + _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); dest = ssn.createTopic("ADDR:my_queue; {create: always}"); consumer1 = ssn.createConsumer(dest); consumer2 = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); - Message m1 = consumer1.receive(1000); + Message m1 = consumer1.receive(1000); Message m2 = consumer2.receive(1000); - + if (m1 != null) { - assertNull("Only one consumer should receive the message",m2); + assertNull("Only one consumer should receive the message",m2); } else { - assertNotNull("Only one consumer should receive the message",m2); + assertNotNull("Only one consumer should receive the message",m2); } } - + public void testXBindingsWithoutExchangeName() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String addr = "ADDR:MRKT; " + "{" + - "create: receiver," + + "create: receiver," + "node : {type: topic, x-declare: {type: topic} }," + "link:{" + "name: my-topic," + "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + "}" + "}"; - + // Using the ADDR method to create a more complicated topic Topic topic = ssn.createTopic(addr); MessageConsumer cons = ssn.createConsumer(topic); - + assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); - + MessageProducer prod = ssn.createProducer(topic); Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); @@ -860,7 +860,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -873,41 +873,41 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } } - + public void testQueueReceiversAndTopicSubscriber() throws Exception { Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); - + QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = qSession.createReceiver(queue); - + TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber sub = tSession.createSubscriber(topic); - + Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); prod1.send(ssn.createTextMessage("test1")); - + MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); prod2.send(ssn.createTextMessage("test2")); - + Message msg1 = receiver.receive(); assertNotNull(msg1); assertEquals("test1",((TextMessage)msg1).getText()); - + Message msg2 = sub.receive(); assertNotNull(msg2); - assertEquals("test2",((TextMessage)msg2).getText()); + assertEquals("test2",((TextMessage)msg2).getText()); } - + public void testDurableSubscriber() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + String bindingStr = "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"; Properties props = new Properties(); @@ -916,19 +916,19 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; - props.setProperty("destination.address5", addrStr); - - Context ctx = new InitialContext(props); + props.setProperty("destination.address5", addrStr); + + Context ctx = new InitialContext(props); for (int i=1; i < 4; i++) { Topic topic = (Topic) ctx.lookup("address"+i); createDurableSubscriber(ctx,ssn,"address"+i,topic,"ADDR:amq.topic/test"); } - + Topic topic = ssn.createTopic("ADDR:news.us"); createDurableSubscriber(ctx,ssn,"my-dest",topic,"ADDR:news.us"); - + Topic namedQueue = (Topic) ctx.lookup("address5"); try { @@ -1001,10 +1001,10 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception - { + { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); MessageProducer prod = ssn.createProducer(ssn.createTopic(producerAddr)); - + Message m = ssn.createTextMessage(destName); prod.send(m); Message msg = cons.receive(1000); @@ -1012,12 +1012,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertEquals(destName,((TextMessage)msg).getText()); ssn.unsubscribe(destName); } - + public void testDeleteOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; @@ -1031,11 +1031,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - - + + String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; dest = new AMQAnyDestination(addr2); try @@ -1047,11 +1047,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); - + String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; dest = new AMQAnyDestination(addr3); try @@ -1064,43 +1064,43 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); } - + /** * Test Goals : 1. Test if the client sets the correct accept mode for unreliable * and at-least-once. * 2. Test default reliability modes for Queues and Topics. * 3. Test if an exception is thrown if exactly-once is used. * 4. Test if an exception is thrown if at-least-once is used with topics. - * + * * Test Strategy: For goal #1 & #2 * For unreliable and at-least-once the test tries to receives messages * in client_ack mode but does not ack the messages. * It will then close the session, recreate a new session * and will then try to verify the queue depth. * For unreliable the messages should have been taken off the queue. - * For at-least-once the messages should be put back onto the queue. - * + * For at-least-once the messages should be put back onto the queue. + * */ - + public void testReliabilityOptions() throws Exception { String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; acceptModeTest(addr1,0); - + String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; acceptModeTest(addr2,2); - + // Default accept-mode for topics - acceptModeTest("ADDR:amq.topic/test",0); - + acceptModeTest("ADDR:amq.topic/test",0); + // Default accept-mode for queues acceptModeTest("ADDR:testQueue1;{create: always}",2); - - String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; + + String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; try { AMQAnyDestination dest = new AMQAnyDestination(addr3); @@ -1111,83 +1111,83 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); } } - + private void acceptModeTest(String address, int expectedQueueDepth) throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons; MessageProducer prod; - + AMQDestination dest = new AMQAnyDestination(address); cons = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + for (int i=0; i < expectedQueueDepth; i++) { prod.send(ssn.createTextMessage("Msg" + i)); } - + for (int i=0; i < expectedQueueDepth; i++) { Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("Msg" + i,((TextMessage)msg).getText()); } - + ssn.close(); ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); - assertEquals(expectedQueueDepth,queueDepth); + long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); + assertEquals(expectedQueueDepth,queueDepth); cons.close(); - prod.close(); + prod.close(); } - + public void testDestinationOnSend() throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test")); MessageProducer prod = ssn.createProducer(null); - + Topic queue = ssn.createTopic("ADDR:amq.topic/test"); prod.send(queue,ssn.createTextMessage("A")); - + Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("A",((TextMessage)msg).getText()); prod.close(); cons.close(); } - + public void testReplyToWithNamelessExchange() throws Exception { System.setProperty("qpid.declare_exchanges","false"); replyToTest("ADDR:my-queue;{create: always}"); System.setProperty("qpid.declare_exchanges","true"); } - + public void testReplyToWithCustomExchange() throws Exception { replyToTest("ADDR:hello;{create:always,node:{type:topic}}"); } - + private void replyToTest(String replyTo) throws Exception { - Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination replyToDest = AMQDestination.createDestination(replyTo); MessageConsumer replyToCons = session.createConsumer(replyToDest); - + Destination dest = session.createQueue("ADDR:amq.direct/test"); - + MessageConsumer cons = session.createConsumer(dest); MessageProducer prod = session.createProducer(dest); Message m = session.createTextMessage("test"); m.setJMSReplyTo(replyToDest); prod.send(m); - + Message msg = cons.receive(); MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); prodR.send(session.createTextMessage("x")); - + Message m1 = replyToCons.receive(); assertNotNull("The reply to consumer should have received the messsage",m1); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java index 626592dc10..5dcf678510 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java @@ -207,21 +207,21 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener } assertTrue("No exception thrown!", caught); caught = false; - + } - + public void testRuntimeSelectorError() throws JMSException { Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(_destination , "testproperty % 5 = 1"); MessageProducer producer = session.createProducer(_destination); Message sentMsg = session.createTextMessage(); - + sentMsg.setIntProperty("testproperty", 1); // 1 % 5 producer.send(sentMsg); Message recvd = consumer.receive(RECIEVE_TIMEOUT); assertNotNull(recvd); - + sentMsg.setStringProperty("testproperty", "hello"); // "hello" % 5 makes no sense producer.send(sentMsg); try @@ -231,47 +231,47 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener } catch (Exception e) { - + } assertFalse("Connection should not be closed", _connection.isClosed()); } - + public void testSelectorWithJMSMessageID() throws Exception { Session session = _connection.createSession(true, Session.SESSION_TRANSACTED); - + MessageProducer prod = session.createProducer(_destination); MessageConsumer consumer = session.createConsumer(_destination,"JMSMessageID IS NOT NULL"); - + for (int i=0; i<2; i++) { Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } session.commit(); - + Message msg1 = consumer.receive(1000); Message msg2 = consumer.receive(1000); - + Assert.assertNotNull("Msg1 should not be null", msg1); Assert.assertNotNull("Msg2 should not be null", msg2); - + session.commit(); - + prod.setDisableMessageID(true); - - for (int i=0; i<2; i++) + + for (int i=2; i<4; i++) { Message msg = session.createTextMessage("Msg" + String.valueOf(i)); prod.send(msg); } - + session.commit(); - Message msg3 = consumer.receive(1000); + Message msg3 = consumer.receive(1000); Assert.assertNull("Msg3 should be null", msg3); session.commit(); consumer = session.createConsumer(_destination,"JMSMessageID IS NULL"); - + Message msg4 = consumer.receive(1000); Message msg5 = consumer.receive(1000); session.commit(); diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index e861b4f4ee..f8ab593c88 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -56,19 +56,19 @@ public class StreamMessageTest extends QpidBrokerTestCase public void testStreamMessageEOF() throws Exception { - Connection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL( ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); + FieldTable ft = new FieldTable(); ft.setString("x-match", "any"); ft.setString("F1000", "1"); - MessageConsumer consumer = - consumerSession.createConsumer(queue, Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), Integer.parseInt(ClientProperties.MAX_PREFETCH_DEFAULT), false, false, (String) null, ft); - + consumerSession.declareAndBind(queue, ft); + MessageConsumer consumer = consumerSession.createConsumer(queue); // force synch to ensure the consumer has resulted in a bound queue // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); // This is the default now diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 5dae98fe21..6bf20d7708 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.test.unit.topic; +import javax.jms.JMSException; +import javax.naming.NamingException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; @@ -37,6 +40,7 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import org.apache.qpid.url.URLSyntaxException; /** @author Apache Software Foundation */ @@ -225,6 +229,44 @@ public class TopicSessionTest extends QpidBrokerTestCase AMQTopic topic = new AMQTopic(con, "testNoLocal"); + noLocalTest(con, topic); + + + con.close(); + } + + + public void testNoLocalDirectExchange() throws Exception + { + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + AMQTopic topic = new AMQTopic("direct://amq.direct/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'"); + + noLocalTest(con, topic); + + + con.close(); + } + + + + public void testNoLocalFanoutExchange() throws Exception + { + + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + + AMQTopic topic = new AMQTopic("fanout://amq.fanout/testNoLocal/testNoLocal?routingkey='testNoLocal',exclusive='true',autodelete='true'"); + + noLocalTest(con, topic); + + con.close(); + } + + + private void noLocalTest(AMQConnection con, AMQTopic topic) + throws JMSException, URLSyntaxException, AMQException, NamingException + { TopicSession session1 = con.createTopicSession(true, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber noLocal = session1.createSubscriber(topic, "", true); @@ -304,9 +346,6 @@ public class TopicSessionTest extends QpidBrokerTestCase //test nolocal subscriber does message m = (TextMessage) noLocal.receive(1000); assertNotNull(m); - - - con.close(); con2.close(); } |
