From f47c28c9ee2fb8f2967a221a28912060edcba749 Mon Sep 17 00:00:00 2001 From: Alex Rudyy Date: Wed, 24 Oct 2012 00:05:45 +0000 Subject: QPID-4389: Send the selector of durable subscriber in arguments of ExchangeBind command git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1401515 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSession_0_10.java | 19 ++++--- .../destination/AddressBasedDestinationTest.java | 60 +++++++++++++++++++++- 2 files changed, 71 insertions(+), 8 deletions(-) (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e271436c21..12b174198a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.AddressHelper; import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -624,7 +625,9 @@ public class AMQSession_0_10 extends AMQSession bindingArguments = new HashMap(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); getQpidSession().exchangeBind(queueName, - dest.getAddressName(), - dest.getSubject(), - Collections.emptyMap()); + dest.getAddressName(), + dest.getSubject(), + bindingArguments); } - + public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 08ee70f072..974e8d6e96 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -941,7 +941,65 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase e.getMessage()); } } - + + public void testDurableSubscription() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName()); + MessageProducer publisher = session.createProducer(topic); + MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName()); + + TextMessage messageToSend = session.createTextMessage("Test0"); + publisher.send(messageToSend); + ((AMQSession)session).sync(); + + Message receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + + subscriber.close(); + + messageToSend = session.createTextMessage("Test1"); + publisher.send(messageToSend); + ((AMQSession)session).sync(); + + subscriber = session.createDurableSubscriber(topic, getTestQueueName()); + receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + } + + public void testDurableSubscriptionnWithSelector() throws Exception + { + Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("ADDR:amq.topic/" + getTestQueueName()); + MessageProducer publisher = session.createProducer(topic); + MessageConsumer subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false); + + TextMessage messageToSend = session.createTextMessage("Test0"); + messageToSend.setIntProperty("id", 1); + publisher.send(messageToSend); + ((AMQSession)session).sync(); + + Message receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); + + subscriber.close(); + + messageToSend = session.createTextMessage("Test1"); + messageToSend.setIntProperty("id", 1); + publisher.send(messageToSend); + ((AMQSession)session).sync(); + + subscriber = session.createDurableSubscriber(topic, getTestQueueName(), "id=1", false); + receivedMessage = subscriber.receive(1000); + assertNotNull("Message has not been received", receivedMessage); + assertEquals("Unexpected message", messageToSend.getText(), ((TextMessage)receivedMessage).getText()); + assertEquals("Unexpected id", 1, receivedMessage.getIntProperty("id")); + } + private void createDurableSubscriber(Context ctx,Session ssn,String destName,Topic topic, String producerAddr) throws Exception { MessageConsumer cons = ssn.createDurableSubscriber(topic, destName); -- cgit v1.2.1